第1章 需求分析和實現思路
1.1 實時數倉分層
在之前介紹實時數倉概念時討論過,建設實時數倉的目的,主要是增加數據計算的復用性。每次新增加統計需求時,不至於從原始數據進行計算,而是從半成品繼續加工而成。我們這里從kafka的ods層讀取用戶行為日志以及業務數據,並進行簡單處理,寫回到kafka作為dwd層。
1.2 每層職能
分層 |
數據描述 |
生成計算工具 |
存儲媒介 |
ODS |
原始數據,日志和業務數據 |
日志服務器,maxwell |
kafka |
DWD |
根據數據對象為單位進行分流,比如訂單、頁面訪問等等。 |
FLINK |
kafka |
DWM |
對於部分數據對象進行進一步加工,比如獨立訪問、跳出行為。依舊是明細數據。 進行了維度冗余(寬表) |
FLINK |
kafka |
DIM |
維度數據 |
FLINK |
HBase |
DWS |
根據某個維度主題將多個事實數據輕度聚合,形成主題寬表。 |
FLINK |
Clickhouse |
ADS |
把Clickhouse中的數據根據可視化需要進行篩選聚合。 |
Clickhouse SQL |
可視化展示 |
第2章 Flink計算環境搭建
2.1 創建實時計算module
2.2 添加需要的依賴
<properties> <flink.version>1.13.1</flink.version> <scala.binary.version>2.12</scala.binary.version> <hadoop.version>3.1.3</hadoop.version> <slf4j.version>1.7.30</slf4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.14.0</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.16</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> <!--commons-beanutils是Apache開源組織提供的用於操作JAVA BEAN的工具包。 使用commons-beanutils,我們可以很方便的對bean對象的屬性進行操作--> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.9.3</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.4.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers combine.children="append"> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
2.3 添加log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
2.4 創建包結構
包 |
作用 |
app |
產生各層數據的flink任務 |
bean |
數據對象 |
common |
公共常量 |
util |
工具類 |
第3章 DWD層: 用戶行為日志
我們前面采集的日志數據已經保存到Kafka中,作為日志數據的ODS層,從kafka的ODS層讀取的日志數據分為3類: 頁面日志、啟動日志和曝光日志。這三類數據雖然都是用戶行為數據,但是有着完全不一樣的數據結構,所以要拆分處理。將拆分后的不同的日志寫回Kafka不同主題中,作為日志DWD層。
頁面日志輸出到主流,啟動日志輸出到啟動側輸出流,曝光日志輸出到曝光側輸出流
3.1 日志格式
1)頁面日志格式
{ "common":{ "ar":"110000", "ba":"Xiaomi", "ch":"xiaomi", "is_new":"0", "md":"Xiaomi 9", "mid":"mid_20", "os":"Android 11.0", "uid":"47", "vc":"v2.1.134" }, "page":{ "during_time":13968, "last_page_id":"home", "page_id":"search" }, "ts":1614575952000 }
2)啟動日志格式
{ "common":{ "ar":"110000", "ba":"iPhone", "ch":"Appstore", "is_new":"0", "md":"iPhone 8", "mid":"mid_19", "os":"iOS 13.3.1", "uid":"50", "vc":"v2.1.134" }, "start":{ "entry":"notice", "loading_time":9286, "open_ad_id":15, "open_ad_ms":6825, "open_ad_skip_ms":0 }, "ts":1614575950000 }
3)曝光日志格式
{ "common":{ "ar":"110000", "ba":"iPhone", "ch":"Appstore", "is_new":"0", "md":"iPhone 8", "mid":"mid_19", "os":"iOS 13.3.1", "uid":"50", "vc":"v2.1.134" }, "displays":[ { "display_type":"activity", "item":"2", "item_type":"activity_id", "order":1, "pos_id":4 }, { "display_type":"activity", "item":"2", "item_type":"activity_id", "order":2, "pos_id":4 }, { "display_type":"promotion", "item":"4", "item_type":"sku_id", "order":3, "pos_id":5 }, { "display_type":"query", "item":"6", "item_type":"sku_id", "order":4, "pos_id":1 }, { "display_type":"promotion", "item":"3", "item_type":"sku_id", "order":5, "pos_id":5 }, { "display_type":"query", "item":"2", "item_type":"sku_id", "order":6, "pos_id":2 }, { "display_type":"query", "item":"7", "item_type":"sku_id", "order":7, "pos_id":3 }, { "display_type":"query", "item":"3", "item_type":"sku_id", "order":8, "pos_id":4 }, { "display_type":"query", "item":"9", "item_type":"sku_id", "order":9, "pos_id":1 }, { "display_type":"promotion", "item":"3", "item_type":"sku_id", "order":10, "pos_id":5 }, { "display_type":"query", "item":"8", "item_type":"sku_id", "order":11, "pos_id":2 } ], "page":{ "during_time":8319, "page_id":"home" }, "ts":1614575950000 }
3.2 主要任務
1)識別新老客戶
本身客戶端業務有新老用戶的標識,但是不夠准確,需要用實時計算再次確認(不涉及業務操作,只是單純的做個狀態確認)。
2)數據拆分
3)不同數據寫入Kafka不同的Topic中(dwd層數據)
3.3 具體實現代碼清單
3.3.1 封裝kafka工具類
1)FlinkSourceUtil
package com.yuange.flinkrealtime.util; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; /** * @作者:袁哥 * @時間:2021/7/28 18:42 */ public class FlinkSourceUtil { public static FlinkKafkaConsumer<String> getKafkaSource(String groupId, String topic){ Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092"); properties.setProperty("group.id", groupId); //如果啟動的時候, 這個消費者對這個topic的消費沒有上次的消費記錄, 就從這個配置的位置開始消費 //如果有消費記錄, 則從上次的位置開始消費 properties.setProperty("auto.offset.reset", "latest"); properties.setProperty("isolation.level", "read_committed"); return new FlinkKafkaConsumer<>( topic, new SimpleStringSchema(), properties ); } }
2)FlinkSinkUtil
package com.yuange.flinkrealtime.util; import lombok.SneakyThrows; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Nullable; import java.util.Properties; /** * @作者:袁哥 * @時間:2021/7/28 18:33 */ public class FlinkSinkUtil { public static FlinkKafkaProducer<String> getKafkaSink(final String topic){ Properties conf = new Properties(); conf.setProperty("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092"); conf.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + ""); return new FlinkKafkaProducer<String>( "default", new KafkaSerializationSchema<String>() { @SneakyThrows public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) { return new ProducerRecord<byte[], byte[]>(topic,null,s.getBytes("utf-8")); } }, conf, FlinkKafkaProducer.Semantic.EXACTLY_ONCE ); }; }
3)YuangeCommonUtil
package com.yuange.flinkrealtime.util; import java.util.ArrayList; import java.util.List; /** * @作者:袁哥 * @時間:2021/7/28 18:48 */ public class YuangeCommonUtil { public static<T> List<T> toList(Iterable<T> it){ List<T> list = new ArrayList<>(); for (T t : it) { list.add(t); } return list; } }
3.3.2 封裝消費Kafka數據的BaseApp類
package com.yuange.flinkrealtime.app; import com.yuange.flinkrealtime.util.FlinkSourceUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @作者:袁哥 * @時間:2021/7/28 18:53 */ public abstract class BaseAppV1 { public void init(int port, int p, String ck, String groupId, String topic){ System.setProperty("HADOOP_USER_NAME","atguigu"); Configuration configuration = new Configuration(); configuration.setInteger("rest.port",port); StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration).setParallelism(p); environment.enableCheckpointing(5000); //檢查點之間的時間間隔,單位是毫秒 environment.setStateBackend(new HashMapStateBackend()); //定義狀態后端,以保證將檢查點狀態寫入遠程(HDFS) environment.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop162:8020/FlinkParent/ck/" + ck); //配置檢查點存放地址 environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //設置檢查點模式:精准一次 environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //設置檢查點失敗時重試次數 environment.getCheckpointConfig() .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設置檢查點持久化:取消作業時保留外部化檢查點 DataStreamSource<String> sourceStream = environment.addSource(FlinkSourceUtil.getKafkaSource(groupId, topic)); run(environment,sourceStream); try { environment.execute(ck); } catch (Exception e) { e.printStackTrace(); } } protected abstract void run(StreamExecutionEnvironment environment, DataStreamSource<String> sourceStream); }
3.3.3 常量
package com.yuange.flinkrealtime.common; /** * @作者:袁哥 * @時間:2021/7/28 19:31 */ public class Constant { public static final String TOPIC_ODS_LOG = "ods_log"; public static final String TOPIC_ODS_DB = "ods_db"; public static final String TOPIC_DWD_START = "dwd_start"; public static final String TOPIC_DWD_PAGE = "dwd_page"; public static final String TOPIC_DWD_DISPLAY = "dwd_display"; }
3.3.4 DWDLogApp具體實現
3.3.4.1 識別新老訪客
1)實現思路:
(1)考慮數據的亂序, 使用event-time語義
(2)按照mid分組
(3)添加5s的滾動窗口
(4)使用狀態記錄首次訪問的時間戳
(5)如果狀態為空, 則此窗口內的最小時間戳的事件為首次訪問, 其他均為非首次訪問
(6)如果狀態不為空, 則此窗口內所有的事件均為非首次訪問
2)實現代碼
package com.yuange.flinkrealtime.app.dwd; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.yuange.flinkrealtime.app.BaseAppV1; import com.yuange.flinkrealtime.common.Constant; import com.yuange.flinkrealtime.util.YuangeCommonUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.time.Duration; import java.util.Comparator; import java.util.List; /** * @作者:袁哥 * @時間:2021/7/28 19:32 */ public class DwdLogApp extends BaseAppV1 { final String START_STREAM = "start"; final String PAGE_STREAM = "page"; final String DISPLAY_STREAM = "display"; public static void main(String[] args) { new DwdLogApp().init( 2001, //端口號 2, //並行度 "DwdLogApp", //檢查點的存放目錄名稱 "DwdLogApp", //消費者組id Constant.TOPIC_ODS_LOG //主題 ); } @Override protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> sourceStream) { //業務 //一、對新老用戶進行確認 SingleOutputStreamOperator<JSONObject> validatedStream = distinguishNewOrOld(sourceStream); validatedStream.print(); } private SingleOutputStreamOperator<JSONObject> distinguishNewOrOld(DataStreamSource<String> sourceStream) { /** * 如何實現識別新老客戶? * 需要利用狀態 * 考慮數據的亂序: 使用事件時間, 加窗口 * */ //創建一個新的WatermarkStrategy來封裝水印策略 return sourceStream .map(JSON::parseObject) //將數據轉為JSON格式 .assignTimestampsAndWatermarks( WatermarkStrategy .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //創建水印策略,水印是周期性生成的。這種水印策略引入的延遲是周期間隔長度加上亂序界限 .withTimestampAssigner((element, recordTimestamp) -> element.getLong("ts")) //ts + 3秒 ) .keyBy(obj -> obj.getJSONObject("common").getString("mid")) //以設備id分組 .window(TumblingEventTimeWindows.of(Time.seconds(5))) //滾動事件窗口,每5秒統計一次數據 .process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() { //定義一個狀態,以此來判斷用戶是否是新老用戶 ValueState<Long> firstWindowState; @Override public void open(Configuration parameters) throws Exception { firstWindowState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("firstWindowState", Long.class)); } @Override public void process(String key, Context context, Iterable<JSONObject> elements, Collector<JSONObject> out) throws Exception { //如果識別出來某個mid的第一個窗口 if (firstWindowState.value() == null){ //把時間戳最小的那個條記錄的is_new設置為1, 其他都為0 List<JSONObject> list = YuangeCommonUtil.toList(elements); list.sort(Comparator.comparing(o -> o.getLong("ts"))); //將JSON數據按ts排序 for (int i = 0; i < list.size(); i++) { JSONObject common = list.get(i).getJSONObject("common"); if (i == 0){ common.put("is_new","1"); //設置is_new為1,表示它是新用戶 firstWindowState.update(list.get(i).getLong("ts")); //更新狀態 }else { common.put("is_new","0"); //設置is_new為0,表示它是老用戶 } out.collect(list.get(i)); //將處理好的數據寫出到流中 } }else { //所有的用戶都是舊用戶, 所有的is_new全部設置為0 for (JSONObject element : elements) { element.getJSONObject("common").put("is_new","0"); out.collect(element); //將處理好的數據寫出到流中 } } } }); } }
3)啟動DwdLogApp
4)生產日志數據(在此之前必須啟動nginx、hadoop、zk、kafka、日志服務器)
cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar
5)查看控制台,發現有數據
3.3.4.2 數據分流
根據日志數據內容,將日志數據分為3類: 頁面日志、啟動日志和曝光日志。頁面日志輸出到主流,啟動日志輸出到啟動側輸出流曝光日志輸出到曝光日志側輸出流。
1)具體寫入代碼
package com.yuange.flinkrealtime.app.dwd; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONAware; import com.alibaba.fastjson.JSONObject; import com.yuange.flinkrealtime.app.BaseAppV1; import com.yuange.flinkrealtime.common.Constant; import com.yuange.flinkrealtime.util.FlinkSinkUtil; import com.yuange.flinkrealtime.util.YuangeCommonUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.time.Duration; import java.util.Comparator; import java.util.HashMap; import java.util.List; /** * @作者:袁哥 * @時間:2021/7/28 19:32 * 對日志數據進行分流, 寫入到dwd層(kafka) * 1. 對新老用戶進行確認 * 2. 對ods_log流進行分流(使用側輸出流) * 不同的數據放入不同的流 * 啟動日志 * 曝光日志 * 頁面日志 * 3. 把數據寫入到Kafka中 */ public class DwdLogApp extends BaseAppV1 { final String START_STREAM = "start"; final String PAGE_STREAM = "page"; final String DISPLAY_STREAM = "display"; public static void main(String[] args) { new DwdLogApp().init( 2001, //端口號 2, //並行度 "DwdLogApp", //檢查點的存放目錄名稱 "DwdLogApp", //消費者組id Constant.TOPIC_ODS_LOG //主題 ); } @Override protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> sourceStream) { //業務 //一、對新老用戶進行確認 SingleOutputStreamOperator<JSONObject> validatedStream = distinguishNewOrOld(sourceStream); // validatedStream.print(); //二、讓不同的日志進入不同的流 HashMap<String, DataStream<JSONObject>> threeStreams = splitSteam(validatedStream); //三、把數據寫入到kafka中 sendToKafka(threeStreams); } private void sendToKafka(HashMap<String, DataStream<JSONObject>> threeStreams) { threeStreams .get(START_STREAM) .map(JSONAware::toJSONString) .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_START)); threeStreams .get(PAGE_STREAM) .map(JSONAware::toJSONString) .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_PAGE)); threeStreams .get(DISPLAY_STREAM) .map(JSONAware::toJSONString) .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_DISPLAY)); } private HashMap<String, DataStream<JSONObject>> splitSteam(SingleOutputStreamOperator<JSONObject> validatedStream) { /** * 把日志分成了3類: * 1. 啟動日志 主流 * 2. 頁面日志 側輸出流 * 3. 曝光日志 側輸出流 * */ OutputTag<JSONObject> pageTag = new OutputTag<JSONObject>("page") { //page側輸出流 }; OutputTag<JSONObject> displayTag = new OutputTag<JSONObject>("display") { //display測輸出流 }; SingleOutputStreamOperator<JSONObject> startStream = validatedStream.process(new ProcessFunction<JSONObject, JSONObject>() { @Override public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception { JSONObject start = value.getJSONObject("start"); //獲取start日志 if (start != null) { //啟動日志放入主流 out.collect(value); } else { JSONObject page = value.getJSONObject("page"); if (page != null) { ctx.output(pageTag, value); //將數據存儲pageTag側輸出流 } JSONArray displays = value.getJSONArray("displays"); if (displays != null) { for (int i = 0; i < displays.size(); i++) { JSONObject display = displays.getJSONObject(i); // 在display中補充一些數據 // 1. 補充時間戳 display.put("ts", value.getLong("ts")); // 2. 補充一個page_id display.put("page_id", value.getJSONObject("page").getString("page_id")); // 3. 補充common中所有的字段 display.putAll(value.getJSONObject("common")); ctx.output(displayTag, display); //將處理好的數據存入display側輸出流 } } } } }); //將側輸出流轉化為DataStream DataStream<JSONObject> pageStream = startStream.getSideOutput(pageTag); DataStream<JSONObject> displayStream = startStream.getSideOutput(displayTag); //將流匯總到Map集合中 HashMap<String, DataStream<JSONObject>> map = new HashMap<>(); map.put(START_STREAM,startStream); map.put(PAGE_STREAM,pageStream); map.put(DISPLAY_STREAM,displayStream); return map; } private SingleOutputStreamOperator<JSONObject> distinguishNewOrOld(DataStreamSource<String> sourceStream) { /** * 如何實現識別新老客戶? * 需要利用狀態 * 考慮數據的亂序: 使用事件時間, 加窗口 * */ //創建一個新的WatermarkStrategy來封裝水印策略 return sourceStream .map(JSON::parseObject) //將數據轉為JSON格式 .assignTimestampsAndWatermarks( WatermarkStrategy .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //創建水印策略,水印是周期性生成的。這種水印策略引入的延遲是周期間隔長度加上亂序界限 .withTimestampAssigner((element, recordTimestamp) -> element.getLong("ts")) //ts + 3秒 ) .keyBy(obj -> obj.getJSONObject("common").getString("mid")) //以設備id分組 .window(TumblingEventTimeWindows.of(Time.seconds(5))) //滾動事件窗口,每5秒統計一次數據 .process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() { //定義一個狀態,以此來判斷用戶是否是新老用戶 ValueState<Long> firstWindowState; @Override public void open(Configuration parameters) throws Exception { firstWindowState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("firstWindowState", Long.class)); } @Override public void process(String key, Context context, Iterable<JSONObject> elements, Collector<JSONObject> out) throws Exception { //如果識別出來某個mid的第一個窗口 if (firstWindowState.value() == null){ //把時間戳最小的那個條記錄的is_new設置為1, 其他都為0 List<JSONObject> list = YuangeCommonUtil.toList(elements); list.sort(Comparator.comparing(o -> o.getLong("ts"))); //將JSON數據按ts排序 for (int i = 0; i < list.size(); i++) { JSONObject common = list.get(i).getJSONObject("common"); if (i == 0){ common.put("is_new","1"); //設置is_new為1,表示它是新用戶 firstWindowState.update(list.get(i).getLong("ts")); //更新狀態 }else { common.put("is_new","0"); //設置is_new為0,表示它是老用戶 } out.collect(list.get(i)); //將處理好的數據寫出到流中 } }else { //所有的用戶都是舊用戶, 所有的is_new全部設置為0 for (JSONObject element : elements) { element.getJSONObject("common").put("is_new","0"); out.collect(element); //將處理好的數據寫出到流中 } } } }); } }
2)生產數據
cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar
3)啟動kafka消費數據
/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic dwd_start
/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic dwd_page
/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic dwd_display
4)查看消費情況即可
第4章 DWD層: 業務數據
業務數據的變化,我們可以通過Maxwell采集到,但是MaxWell是把全部數據統一寫入一個Topic中, 這些數據包括業務數據,也包含維度數據,這樣顯然不利於日后的數據處理,所以這個功能是從Kafka的業務數據ODS層讀取數據,經過處理后,將維度數據保存到Hbase,將事實數據寫回Kafka作為業務數據的DWD層。
4.1 主要任務
4.1.1 接收Kafka數據,過濾空值數據
4.1.2 實現動態分流功能
由於MaxWell是把全部數據統一寫入一個Topic中, 這樣顯然不利於日后的數據處理。所以需要把各個表拆開處理。但是由於每個表有不同的特點,有些表是維度表,有些表是事實表,有的表既是事實表在某種情況下也是維度表。在實時計算中一般把維度數據寫入存儲容器,一般是方便通過主鍵查詢的數據庫比如HBase,Redis,MySQL等。
一般把事實數據寫入流中,進行進一步處理,最終形成寬表。但是作為Flink實時計算任務,如何得知哪些表是維度表,哪些是事實表呢?而這些表又應該采集哪些字段呢?
這樣的配置不適合寫在配置文件中,因為這樣的話,業務端隨着需求變化每增加一張表,就要修改配置重啟計算程序。所以這里需要一種動態配置方案,把這種配置長期保存起來,一旦配置有變化,實時計算可以自動感知。
這種可以有兩個方案實現:一種是用Zookeeper存儲,通過Watch感知數據變化。另一種是用mysql數據庫存儲。
這里選擇第二種方案,主要是mysql對於配置數據初始化和維護管理,用sql都比較方便,雖然周期性操作時效性差一點,但是配置變化並不頻繁。所以就有了如下圖:
4.1.3 把分好的流保存到對應表、主題中
業務數據保存到Kafka的主題中,維度數據保存到Hbase的表中
4.2 具體實現代碼
4.2.1 設計動態配置表
1)創建動態配置表並初始化數據
CREATE DATABASE `flink_realtime` CHARACTER SET utf8 COLLATE utf8_general_ci; USE flink_realtime; source /opt/software/mock/mock_db/table_process_init.sql;
2)配置表實體類
package com.yuange.flinkrealtime.bean; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @作者:袁哥 * @時間:2021/7/29 11:10 */ @AllArgsConstructor @NoArgsConstructor @Data public class TableProcess { //動態分流Sink常量 public static final String SINK_TYPE_HBASE = "hbase"; public static final String SINK_TYPE_KAFKA = "kafka"; public static final String SINK_TYPE_CK = "clickhouse"; //來源表 private String source_table; //操作類型 insert,update,delete private String operate_type; //輸出類型 hbase kafka private String sink_type; //輸出表(主題) private String sink_table; //輸出字段 private String sink_columns; //主鍵字段 private String sink_pk; //建表擴展 private String sink_extend; }
4.2.2 實現思路
1)業務數據: mysql->maxwell->kafka->flink
2)動態表配置表的數據: msyql->flink-sql-cdc
3)把動態表配置表做成廣播流與業務數據進行connect, 從而實現動態控制業務數據的sink方向
4.2.3 讀取動態配置表
1)Flink SQL CDC 介紹
CDC 全稱是 Change Data Capture ,它是一個比較廣義的概念,只要能捕獲變更的數據,我們都可以稱為 CDC 。業界主要有基於查詢的 CDC 和基於日志的 CDC ,可以從下面表格對比他們功能和差異點。
2)傳統的數據同步場景(咱們前面用的場景):
缺點: 采集端組件過多導致維護繁雜
改進后的架構:
Flink社區開發了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL 等數據庫直接讀取全量數據和增量變更數據的 source 組件。
目前也已開源,開源地址:https://github.com/ververica/flink-cdc-connectors
3)修改mysql配置,增加對數據庫flink_realtime監控
(1)修改配置文件
sudo vim /etc/my.cnf
[mysqld] server-id= 1 #日志前綴 log-bin=mysql-bin ##同步策略 binlog_format=row ##同步的庫 binlog-do-db=flinkdb binlog-do-db=flink_realtime
(2)需要重啟mysql數據庫
sudo systemctl restart mysqld
(3)確認msyql有沒有啟動成功
sudo systemctl status mysqld
#或者
ps -ef | grep mysqld
(4)注意maxwell不要再采集這個數據庫的數據,在maxwell的配置中添加如下配置
vim /opt/module/maxwell-1.27.1/config.properties
filter=exclude:flink_realtime.*
4)導入CDC依賴
<!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.4.0</version> </dependency>
5)具體實現代碼
package com.yuange.flinkrealtime.app.dwd; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.yuange.flinkrealtime.app.BaseAppV1; import com.yuange.flinkrealtime.bean.TableProcess; import com.yuange.flinkrealtime.common.Constant; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @作者:袁哥 * @時間:2021/7/29 11:33 * 業務數據使用maxwell采集到kafka形成ods層的業務數據,然后再由Flink來接收處理ods層的數據 * 配置數據使用FlinkCDC直接從MySQL中讀取,並且進行實時監控,只要配置表中的數據發生變動,FlinkCDC會感知到並進行相應的處理 */ public class DwdDbApp extends BaseAppV1 { public static void main(String[] args) { new DwdDbApp().init( 2002, //端口號 1, //並行度 "DwdDbApp", //檢查點存放在HDFS上的目錄名稱 "DwdDbApp", //消費者組 Constant.TOPIC_ODS_DB //消費的主題 ); } @Override protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> dataStream) { //讀取配置表的數據, 得到一個配置流(cdc) SingleOutputStreamOperator<TableProcess> tableProcessStream = readTableProcess(environment); } private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment environment) { /** * 第一次讀取全部數據 * 以后監控mysql中這個配置表的數據的更新 * */ StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); tableEnvironment.executeSql("create table `table_process` (\n" + " `source_table` string,\n" + " `operate_type` string,\n" + " `sink_type` string,\n" + " `sink_table` string,\n" + " `sink_columns` string,\n" + " `sink_pk` string,\n" + " `sink_extend` string,\n" + " primary key (`source_table`,`operate_type`) not enforced\n" + ")with(" + " 'connector' = 'mysql-cdc', " + " 'hostname' = 'hadoop162', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = 'aaaaaa', " + " 'database-name' = 'flink_realtime', " + " 'table-name' = 'table_process', " + " 'debezium.snapshot.mode' = 'initial', " + ")" ); /** * initial: 啟動的時候會讀取表中所有的數據, 放在內存中, 全部數據讀取完成之后, 會使用binlog來監控mysql的變化 * never: 只用binlog來監控mysql的變化 */ Table table_process = tableEnvironment.from("table_process"); return tableEnvironment .toRetractStream(table_process, TableProcess.class) //將table轉化為可以新增和變化的dataStream .filter(t -> t.f0) //過濾出變化的數據 .map(t -> t.f1); //返回數據:TableProcess } }
4.2.4 讀取業務數據並ETL
package com.yuange.flinkrealtime.app.dwd; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.yuange.flinkrealtime.app.BaseAppV1; import com.yuange.flinkrealtime.bean.TableProcess; import com.yuange.flinkrealtime.common.Constant; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @作者:袁哥 * @時間:2021/7/29 11:33 * 業務數據使用maxwell采集到kafka形成ods層的業務數據,然后再由Flink來接收處理ods層的數據 * 配置數據使用FlinkCDC直接從MySQL中讀取,並且進行實時監控,只要配置表中的數據發生變動,FlinkCDC會感知到並進行相應的處理 */ public class DwdDbApp extends BaseAppV1 { public static void main(String[] args) { new DwdDbApp().init( 2002, //端口號 1, //並行度 "DwdDbApp", //檢查點存放在HDFS上的目錄名稱 "DwdDbApp", //消費者組 Constant.TOPIC_ODS_DB //消費的主題 ); } @Override protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> dataStream) { //1. 對數據進行etl SingleOutputStreamOperator<JSONObject> etledStream = etlDataStream(dataStream); //2. 讀取配置表的數據, 得到一個配置流(cdc) SingleOutputStreamOperator<TableProcess> tableProcessStream = readTableProcess(environment); } private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment environment) { /** * 第一次讀取全部數據 * 以后監控mysql中這個配置表的數據的更新 * */ StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); tableEnvironment.executeSql("create table `table_process` (\n" + " `source_table` string,\n" + " `operate_type` string,\n" + " `sink_type` string,\n" + " `sink_table` string,\n" + " `sink_columns` string,\n" + " `sink_pk` string,\n" + " `sink_extend` string,\n" + " primary key (`source_table`,`operate_type`) not enforced\n" + ")with(" + " 'connector' = 'mysql-cdc', " + " 'hostname' = 'hadoop162', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = 'aaaaaa', " + " 'database-name' = 'flink_realtime', " + " 'table-name' = 'table_process', " + " 'debezium.snapshot.mode' = 'initial', " + ")" ); /** * initial: 啟動的時候會讀取表中所有的數據, 放在內存中, 全部數據讀取完成之后, 會使用binlog來監控mysql的變化 * never: 只用binlog來監控mysql的變化 */ Table table_process = tableEnvironment.from("table_process"); return tableEnvironment .toRetractStream(table_process, TableProcess.class) //將table轉化為可以新增和變化的dataStream .filter(t -> t.f0) //過濾出變化的數據 .map(t -> t.f1); //返回數據:TableProcess } private SingleOutputStreamOperator<JSONObject> etlDataStream(DataStreamSource<String> dataStream) { return dataStream .map(JSON::parseObject) //將流中的數據轉為JSON格式 .filter(obj -> obj.getString("database") != null && obj.getString("table") != null && obj.getString("type") != null && (obj.getString("type").contains("insert") || "update".equals(obj.getString("type"))) && obj.getString("data") != null && obj.getString("data").length() > 10 ); } }
4.2.5 業務數據表和動態配置表connect
1)把動態配置表做成廣播流, 和數據表流進行connect, 然后進行數據的分流: 事實表數據在主流, hbase數據在側輸出流
package com.yuange.flinkrealtime.app.dwd; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.yuange.flinkrealtime.app.BaseAppV1; import com.yuange.flinkrealtime.bean.TableProcess; import com.yuange.flinkrealtime.common.Constant; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.util.Arrays; import java.util.List; /** * @作者:袁哥 * @時間:2021/7/29 11:33 * 業務數據使用maxwell采集到kafka形成ods層的業務數據,然后再由Flink來接收處理ods層的數據 * 配置數據使用FlinkCDC直接從MySQL中讀取,並且進行實時監控,只要配置表中的數據發生變動,FlinkCDC會感知到並進行相應的處理 */ public class DwdDbApp extends BaseAppV1 { public static void main(String[] args) { new DwdDbApp().init( 2002, //端口號 1, //並行度 "DwdDbApp", //檢查點存放在HDFS上的目錄名稱 "DwdDbApp", //消費者組 Constant.TOPIC_ODS_DB //消費的主題 ); } @Override protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> dataStream) { //1. 對數據進行etl SingleOutputStreamOperator<JSONObject> etledStream = etlDataStream(dataStream); //2. 讀取配置表的數據, 得到一個配置流(cdc) SingleOutputStreamOperator<TableProcess> tableProcessStream = readTableProcess(environment); //3. 數據流和配置流進行connect,返回值就是:一個JSONObject數據對應一個TableProcess配置 SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams = connectStreams(etledStream, tableProcessStream); //4.每條數據根據他的配置, 進行動態分流 Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> kafkaHbaseStreams = dynamicSplit(connectedStreams); kafkaHbaseStreams.f0.print("kafka"); kafkaHbaseStreams.f1.print("hbase"); } private Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> dynamicSplit(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams) { //側輸出流 OutputTag<Tuple2<JSONObject, TableProcess>> hbaseTag = new OutputTag<Tuple2<JSONObject, TableProcess>>("hbase") { }; SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> kafkaStream = connectedStreams.process(new ProcessFunction<Tuple2<JSONObject, TableProcess>, Tuple2<JSONObject, TableProcess>>() { @Override public void processElement(Tuple2<JSONObject, TableProcess> value, Context ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception { //只取出JSONObject中的data數據,相當於做了一次過濾,我們只需要data數據 Tuple2<JSONObject, TableProcess> data = Tuple2.of(value.f0.getJSONObject("data"), value.f1); //其實這個地方應該根據sink_cloumns的值進行一個過濾, 只保留需要sink的字段 filterColumns(data); /** * 從TableProcess配置中獲取每條數據應該去往哪里:getSink_type * hbase * kafka * */ String sink_type = value.f1.getSink_type(); if (TableProcess.SINK_TYPE_KAFKA.equals(sink_type)) { //如果這條數據中的配置顯示是kafka,則將數據發往kafka(使用主流) //事實數據較多,使用主流發往kafka out.collect(data); } else if (TableProcess.SINK_TYPE_HBASE.equals(sink_type)) { //如果這條數據中的配置顯示是hbase,則將數據發往hbase(使用側輸出流) //因為維度數據較少,故使用側輸出流發往hbase ctx.output(hbaseTag, data); } } private void filterColumns(Tuple2<JSONObject, TableProcess> data) { JSONObject jsonObject = data.f0; //將配置表中的配個字段切分開來,放到一個List集合中 /* id,activity_name,activity_type,activity_desc,start_time,end_time,create_time id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level */ List<String> columns = Arrays.asList(data.f1.getSink_columns().split(",")); //如果columns集合中沒有對應的key值,那么JSONObject中的這條數據就刪除它 jsonObject.keySet().removeIf(key -> !columns.contains(key)); } }); //將側輸出流轉換為DataStream DataStream<Tuple2<JSONObject, TableProcess>> hbaseStream = kafkaStream.getSideOutput(hbaseTag); return Tuple2.of(kafkaStream,hbaseStream); } private SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectStreams(SingleOutputStreamOperator<JSONObject> etledStream, SingleOutputStreamOperator<TableProcess> tableProcessStream) { MapStateDescriptor<String, TableProcess> tpStateDesc = new MapStateDescriptor<>("tpStateDesc", String.class, TableProcess.class); /* 動態分流 目標: 應該得到一個新的流, 新的流存儲的數據類型應該是一個二維元組 <JSONObject, TableProcess> 碰到一條數據流中的數據, 找一個TableProcess key: source_table:operate_type value: TableProcess */ //1.將配置流做成廣播流 BroadcastStream<TableProcess> tpBroadcastStream = tableProcessStream.broadcast(tpStateDesc); //2.廣播流與數據流進行connect return etledStream.keyBy(obj -> obj.getString("table")) //以table分組,然后將每個table與廣播流connect .connect(tpBroadcastStream) .process(new KeyedBroadcastProcessFunction<String, JSONObject, TableProcess, Tuple2<JSONObject,TableProcess>>() { @Override public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<Tuple2<JSONObject,TableProcess>> out) throws Exception { //獲取廣播狀態,因為所有的配置信息都保存在了廣播狀態中 ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc); //拼接table:type作為key值 String key = value.getString("table") + ":" + value.getString("type").replaceAll("bootstrap-", ""); //根據在數據流中拼接出來的key值,從廣播狀態中取出對應的配置信息(即給每一條數據找一個TableProcess配置) TableProcess tableProcess = broadcastState.get(key); //如果tableProcess是null,證明這條數據不需要后面處理 if (tableProcess != null){ out.collect(Tuple2.of(value,tableProcess)); //將Json數據和配置數據返回(一個table對應一個配置信息) } } @Override public void processBroadcastElement(TableProcess value, Context ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception { //把來的每條配置都寫入到廣播狀態中 BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc); //從上下文環境中獲取廣播狀態 //拼接key,以保存到廣播狀態中 /* Source_table Operate_type activity_info insert activity_info update activity_rule insert activity_rule update activity_sku insert activity_sku update */ String key = value.getSource_table() + ":" + value.getOperate_type(); //一條記錄就是一個配置信息 broadcastState.put(key,value); } }); } private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment environment) { /** * 第一次讀取全部數據 * 以后監控mysql中這個配置表的數據的更新 * */ StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); tableEnvironment.executeSql("create table `table_process` (\n" + " `source_table` string,\n" + " `operate_type` string,\n" + " `sink_type` string,\n" + " `sink_table` string,\n" + " `sink_columns` string,\n" + " `sink_pk` string,\n" + " `sink_extend` string,\n" + " primary key (`source_table`,`operate_type`) not enforced\n" + ")with(" + " 'connector' = 'mysql-cdc', " + " 'hostname' = 'hadoop162', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = 'aaaaaa', " + " 'database-name' = 'flink_realtime', " + " 'table-name' = 'table_process', " + " 'debezium.snapshot.mode' = 'initial' " + ")" ); /** * initial: 啟動的時候會讀取表中所有的數據, 放在內存中, 全部數據讀取完成之后, 會使用binlog來監控mysql的變化 * never: 只用binlog來監控mysql的變化 */ Table table_process = tableEnvironment.from("table_process"); return tableEnvironment .toRetractStream(table_process, TableProcess.class) //將table轉化為可以新增和變化的dataStream .filter(t -> t.f0) //過濾出變化的數據 .map(t -> t.f1); //返回數據:TableProcess } private SingleOutputStreamOperator<JSONObject> etlDataStream(DataStreamSource<String> dataStream) { return dataStream .map(JSON::parseObject) //將流中的數據轉為JSON格式 .filter(obj -> obj.getString("database") != null && obj.getString("table") != null && obj.getString("type") != null && (obj.getString("type").contains("insert") || "update".equals(obj.getString("type"))) && obj.getString("data") != null && obj.getString("data").length() > 10 ); } }
2)啟動Hadoop(checkpoint持久化)
hadoop.sh start
3)啟動ZK
zk start
4)啟動Kafka
kafka.sh start
5)啟動DwdDbApp,准備接收Kafka中的數據
6)啟動Maxwell,實時監控Mysql中業務數據的變化,並將業務數據導入Kafka中(也可以將舊的數據導入Kafka,使用maxwell-bootstrap即可)
/opt/module/maxwell-1.27.1/bin/maxwell --config /opt/module/maxwell-1.27.1/config.properties --daemon
/opt/module/maxwell-1.27.1/bin/maxwell-bootstrap --user maxwell --password aaaaaa --host hadoop162 --database flinkdb --table user_info --client_id maxwell_1
7)查看數據,發現控制台有打印(數據太多會把kafka主流的數據沖掉,因為控制台長度有限)
8)生產業務數據,模擬新增
cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar
4.2.6 數據sink到正確的位置
1)Sink到Hbase
(1)導入Phoenix相關依賴
<dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>5.0.0-HBase-2.0</version> <exclusions> <exclusion> <groupId>org.glassfish</groupId> <artifactId>javax.el</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>30.1-jre</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency>
(2)在Constant中新增兩個常量
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"; //CTRL + N:全局搜索 public static final String PHOENIX_URL = "jdbc:phoenix:hadoop162,hadoop163,hadoop164:2181";
(3)新建一個PhoenixSink,讓它繼承RichSinkFunction,將數據寫入HBase
package com.yuange.flinkrealtime.sink; import com.alibaba.fastjson.JSONObject; import com.yuange.flinkrealtime.bean.TableProcess; import com.yuange.flinkrealtime.util.JdbcUtil; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; /** * @作者:袁哥 * @時間:2021/7/30 23:25 */ public class PhoenixSink extends RichSinkFunction<Tuple2<JSONObject, TableProcess>> { Connection conn; ValueState<String> tableCreateState; @Override public void open(Configuration parameters) throws Exception { //先加載驅動, 很多情況不是必須. //大部分常用的數據庫會根據url自動選擇合適的driver //Phoenix 驅動有些時候需要手動加載一下 conn = JdbcUtil.getPhoenixConnection(); //創建一個狀態來管理table tableCreateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("tableCreateState", String.class)); } @Override public void invoke(Tuple2<JSONObject, TableProcess> value, Context context) throws Exception { // 1. 檢測表, 如果表不存在就需要在Phoenix中新建表 checkTable(value); // 2. 再把數據寫入到phoenix中 writeToPhoenix(value); } private void writeToPhoenix(Tuple2<JSONObject, TableProcess> value) throws SQLException { JSONObject data = value.f0; TableProcess tp = value.f1; // upsert into user(id, name, age) values(?,?,?) //拼接SQL語句 StringBuilder insertSql = new StringBuilder(); insertSql .append("upsert into ") .append(tp.getSink_table()) .append("(") //id,activity_name,activity_type,activity_desc,start_time,end_time,create_time .append(tp.getSink_columns()) .append(")values(") //把非,部分替換為? .append(tp.getSink_columns().replaceAll("[^,]+","?")) .append(")"); PreparedStatement ps = conn.prepareStatement(insertSql.toString()); //給ps中的占位符賦值 String[] columnNames = tp.getSink_columns().split(","); for (int i = 0; i < columnNames.length; i++) { //從JSONObject數據中取出對應字段的值 Object str = data.getString(columnNames[i]); ps.setString(i + 1,str == null ? "" : str.toString()); } ps.execute(); conn.commit(); ps.close(); } private void checkTable(Tuple2<JSONObject, TableProcess> value) throws IOException, SQLException { if (tableCreateState.value() == null){ // 執行sql語句 create table if not exists user(id varchar, age varchar ) TableProcess tp = value.f1; // 拼接sql語句 StringBuilder createSql = new StringBuilder(); createSql .append("create table if not exists ") .append(tp.getSink_table()) .append("(") .append(tp.getSink_columns().replaceAll(","," varchar,")) .append(" varchar, constraint pk primary key(") .append(tp.getSink_pk() == null ? "id" : tp.getSink_pk()) .append("))") .append(tp.getSink_extend() == null ? "" : tp.getSink_extend()); PreparedStatement ps = conn.prepareStatement(createSql.toString()); ps.execute(); conn.commit(); ps.close(); //更新狀態 tableCreateState.update(tp.getSink_table()); } } }
(4)新建JDBCUtil,獲取Phoenix連接
package com.yuange.flinkrealtime.util; import com.yuange.flinkrealtime.common.Constant; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; /** * @作者:袁哥 * @時間:2021/7/30 23:30 */ public class JdbcUtil { public static Connection getPhoenixConnection() throws ClassNotFoundException, SQLException { Class.forName(Constant.PHOENIX_DRIVER); return DriverManager.getConnection(Constant.PHOENIX_URL); } }
(5)在FlinkSinkUtil中添加getHbaseSink()方法,返回值就是一個SinkFunction,而我們新建的PhoenixSink繼承了RichSinkFunction,RichSinkFunction又實現了SinkFunction,所以可以直接將PhoenixSink返回即可
public static SinkFunction<Tuple2<JSONObject, TableProcess>> getHbaseSink(){ return new PhoenixSink(); }
(6)DwdDbApp中調用FlinkSinkUtil.getHbaseSink()即可把數據寫入hbase
package com.yuange.flinkrealtime.app.dwd; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.yuange.flinkrealtime.app.BaseAppV1; import com.yuange.flinkrealtime.bean.TableProcess; import com.yuange.flinkrealtime.common.Constant; import com.yuange.flinkrealtime.util.FlinkSinkUtil; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.util.Arrays; import java.util.List; /** * @作者:袁哥 * @時間:2021/7/29 11:33 * 業務數據使用maxwell采集到kafka形成ods層的業務數據,然后再由Flink來接收處理ods層的數據 * 配置數據使用FlinkCDC直接從MySQL中讀取,並且進行實時監控,只要配置表中的數據發生變動,FlinkCDC會感知到並進行相應的處理 */ public class DwdDbApp extends BaseAppV1 { public static void main(String[] args) { new DwdDbApp().init( 2002, //端口號 1, //並行度 "DwdDbApp", //檢查點存放在HDFS上的目錄名稱 "DwdDbApp", //消費者組 Constant.TOPIC_ODS_DB //消費的主題 ); } @Override protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> dataStream) { //1. 對數據進行etl SingleOutputStreamOperator<JSONObject> etledStream = etlDataStream(dataStream); //2. 讀取配置表的數據, 得到一個配置流(cdc) SingleOutputStreamOperator<TableProcess> tableProcessStream = readTableProcess(environment); //3. 數據流和配置流進行connect,返回值就是:一個JSONObject數據對應一個TableProcess配置 SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams = connectStreams(etledStream, tableProcessStream); //4.每條數據根據他的配置, 進行動態分流 Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> kafkaHbaseStreams = dynamicSplit(connectedStreams); // kafkaHbaseStreams.f0.print("kafka"); // kafkaHbaseStreams.f1.print("hbase"); //5.維度表寫入到hbase sendToHbase(kafkaHbaseStreams.f1); } private void sendToHbase(DataStream<Tuple2<JSONObject, TableProcess>> stream) { /** * 向hbase(Phoenix)寫入數據的時候, 表不會自動創建 * 1. 先創建表 動態創建 * 2. 再寫入 * */ stream.keyBy(t -> t.f1.getSink_table()) //按照配置表中的Sink_table分組 .addSink(FlinkSinkUtil.getHbaseSink()); } private Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> dynamicSplit(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams) { //側輸出流 OutputTag<Tuple2<JSONObject, TableProcess>> hbaseTag = new OutputTag<Tuple2<JSONObject, TableProcess>>("hbase") { }; SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> kafkaStream = connectedStreams.process(new ProcessFunction<Tuple2<JSONObject, TableProcess>, Tuple2<JSONObject, TableProcess>>() { @Override public void processElement(Tuple2<JSONObject, TableProcess> value, Context ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception { //只取出JSONObject中的data數據,相當於做了一次過濾,我們只需要data數據 Tuple2<JSONObject, TableProcess> data = Tuple2.of(value.f0.getJSONObject("data"), value.f1); //其實這個地方應該根據sink_cloumns的值進行一個過濾, 只保留需要sink的字段 filterColumns(data); /** * 從TableProcess配置中獲取每條數據應該去往哪里:getSink_type * hbase * kafka * */ String sink_type = value.f1.getSink_type(); if (TableProcess.SINK_TYPE_KAFKA.equals(sink_type)) { //如果這條數據中的配置顯示是kafka,則將數據發往kafka(使用主流) //事實數據較多,使用主流發往kafka out.collect(data); } else if (TableProcess.SINK_TYPE_HBASE.equals(sink_type)) { //如果這條數據中的配置顯示是hbase,則將數據發往hbase(使用側輸出流) //因為維度數據較少,故使用側輸出流發往hbase ctx.output(hbaseTag, data); } } private void filterColumns(Tuple2<JSONObject, TableProcess> data) { JSONObject jsonObject = data.f0; //將配置表中的配個字段切分開來,放到一個List集合中 /* id,activity_name,activity_type,activity_desc,start_time,end_time,create_time id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level */ List<String> columns = Arrays.asList(data.f1.getSink_columns().split(",")); //如果columns集合中沒有對應的key值,那么JSONObject中的這條數據就刪除它 jsonObject.keySet().removeIf(key -> !columns.contains(key)); } }); //將側輸出流轉換為DataStream DataStream<Tuple2<JSONObject, TableProcess>> hbaseStream = kafkaStream.getSideOutput(hbaseTag); return Tuple2.of(kafkaStream,hbaseStream); } private SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectStreams(SingleOutputStreamOperator<JSONObject> etledStream, SingleOutputStreamOperator<TableProcess> tableProcessStream) { MapStateDescriptor<String, TableProcess> tpStateDesc = new MapStateDescriptor<>("tpStateDesc", String.class, TableProcess.class); /* 動態分流 目標: 應該得到一個新的流, 新的流存儲的數據類型應該是一個二維元組 <JSONObject, TableProcess> 碰到一條數據流中的數據, 找一個TableProcess key: source_table:operate_type value: TableProcess */ //1.將配置流做成廣播流 BroadcastStream<TableProcess> tpBroadcastStream = tableProcessStream.broadcast(tpStateDesc); //2.廣播流與數據流進行connect return etledStream.keyBy(obj -> obj.getString("table")) //以table分組,然后將每個table與廣播流connect .connect(tpBroadcastStream) .process(new KeyedBroadcastProcessFunction<String, JSONObject, TableProcess, Tuple2<JSONObject,TableProcess>>() { @Override public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<Tuple2<JSONObject,TableProcess>> out) throws Exception { //獲取廣播狀態,因為所有的配置信息都保存在了廣播狀態中 ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc); //拼接table:type作為key值 String key = value.getString("table") + ":" + value.getString("type").replaceAll("bootstrap-", ""); //根據在數據流中拼接出來的key值,從廣播狀態中取出對應的配置信息(即給每一條數據找一個TableProcess配置) TableProcess tableProcess = broadcastState.get(key); //如果tableProcess是null,證明這條數據不需要后面處理 if (tableProcess != null){ out.collect(Tuple2.of(value,tableProcess)); //將Json數據和配置數據返回(一個table對應一個配置信息) } } @Override public void processBroadcastElement(TableProcess value, Context ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception { //把來的每條配置都寫入到廣播狀態中 BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc); //從上下文環境中獲取廣播狀態 //拼接key,以保存到廣播狀態中 /* Source_table Operate_type activity_info insert activity_info update activity_rule insert activity_rule update activity_sku insert activity_sku update */ String key = value.getSource_table() + ":" + value.getOperate_type(); //一條記錄就是一個配置信息 broadcastState.put(key,value); } }); } private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment environment) { /** * 第一次讀取全部數據 * 以后監控mysql中這個配置表的數據的更新 * */ StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); tableEnvironment.executeSql("create table `table_process` (\n" + " `source_table` string,\n" + " `operate_type` string,\n" + " `sink_type` string,\n" + " `sink_table` string,\n" + " `sink_columns` string,\n" + " `sink_pk` string,\n" + " `sink_extend` string,\n" + " primary key (`source_table`,`operate_type`) not enforced\n" + ")with(" + " 'connector' = 'mysql-cdc', " + " 'hostname' = 'hadoop162', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = 'aaaaaa', " + " 'database-name' = 'flink_realtime', " + " 'table-name' = 'table_process', " + " 'debezium.snapshot.mode' = 'initial' " + ")" ); /** * initial: 啟動的時候會讀取表中所有的數據, 放在內存中, 全部數據讀取完成之后, 會使用binlog來監控mysql的變化 * never: 只用binlog來監控mysql的變化 */ Table table_process = tableEnvironment.from("table_process"); return tableEnvironment .toRetractStream(table_process, TableProcess.class) //將table轉化為可以新增和變化的dataStream .filter(t -> t.f0) //過濾出變化的數據 .map(t -> t.f1); //返回數據:TableProcess } private SingleOutputStreamOperator<JSONObject> etlDataStream(DataStreamSource<String> dataStream) { return dataStream .map(JSON::parseObject) //將流中的數據轉為JSON格式 .filter(obj -> obj.getString("database") != null && obj.getString("table") != null && obj.getString("type") != null && (obj.getString("type").contains("insert") || "update".equals(obj.getString("type"))) && obj.getString("data") != null && obj.getString("data").length() > 10 ); } }
(7)啟動Hbase
start-hbase.sh
(8)進入Phoenix客戶端,查看表,發現目前啥也沒有
/opt/module/phoenix-5.0.0/bin/sqlline.py
(9)啟動DwdDbApp,接收從kafka中讀取的數據以及使用Flink CDC讀取的配置表信息
(10)使用maxwell-bootstrap將舊的數據導入kafka中(在此之前必須啟動maxwell)
/opt/module/maxwell-1.27.1/bin/maxwell --config /opt/module/maxwell-1.27.1/config.properties --daemon
/opt/module/maxwell-1.27.1/bin/maxwell-bootstrap --user maxwell --password aaaaaa --host hadoop162 --database flinkdb --table user_info --client_id maxwell_1
(11)發現user_info表已經創建
(12)再次使用maxwell-bootstrap,將activity_info以前的數據導入kafka,發現activity_info已經生成並且有數據
/opt/module/maxwell-1.27.1/bin/maxwell-bootstrap --user maxwell --password aaaaaa --host hadoop162 --database flinkdb --table activity_info --client_id maxwell_1
2)Sink到Kafka
(1)在FlinkSinkUtil 中添加getKafkaSink方法
public static FlinkKafkaProducer<Tuple2<JSONObject, TableProcess>> getKafkaSink(){ Properties properties = new Properties(); properties.setProperty("bootstrap.servers","hadoop162:9092,hadoop163:9092,hadoop164:9092"); properties.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + ""); return new FlinkKafkaProducer<Tuple2<JSONObject,TableProcess>>( "default", //kafka序列化器 new KafkaSerializationSchema<Tuple2<JSONObject, TableProcess>>() { @Override public ProducerRecord<byte[], byte[]> serialize(Tuple2<JSONObject, TableProcess> element, @Nullable Long aLong) { return new ProducerRecord<>( element.f1.getSink_table(), //每條數據對應一個配置表中的數據,將TableProcess配置表中的Sink_table作為kafka的主題 null, element.f0.toJSONString().getBytes(StandardCharsets.UTF_8) //將JSONObject事實數據作為value寫入kafka的topic中 ); } }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE //精准一次 ); }
(2)在DwdDbApp中直接調用即可,完整的DwdDbApp代碼如下
package com.yuange.flinkrealtime.app.dwd; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.yuange.flinkrealtime.app.BaseAppV1; import com.yuange.flinkrealtime.bean.TableProcess; import com.yuange.flinkrealtime.common.Constant; import com.yuange.flinkrealtime.util.FlinkSinkUtil; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.util.Arrays; import java.util.List; /** * @作者:袁哥 * @時間:2021/7/29 11:33 * 業務數據使用maxwell采集到kafka形成ods層的業務數據,然后再由Flink來接收處理ods層的數據 * 配置數據使用FlinkCDC直接從MySQL中讀取,並且進行實時監控,只要配置表中的數據發生變動,FlinkCDC會感知到並進行相應的處理 */ public class DwdDbApp extends BaseAppV1 { public static void main(String[] args) { new DwdDbApp().init( 2002, //端口號 1, //並行度 "DwdDbApp", //檢查點存放在HDFS上的目錄名稱 "DwdDbApp", //消費者組 Constant.TOPIC_ODS_DB //消費的主題 ); } @Override protected void run(StreamExecutionEnvironment environment, DataStreamSource<String> dataStream) { //1. 對數據進行etl SingleOutputStreamOperator<JSONObject> etledStream = etlDataStream(dataStream); //2. 讀取配置表的數據, 得到一個配置流(cdc) SingleOutputStreamOperator<TableProcess> tableProcessStream = readTableProcess(environment); //3. 數據流和配置流進行connect,返回值就是:一個JSONObject數據對應一個TableProcess配置 SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams = connectStreams(etledStream, tableProcessStream); //4.每條數據根據他的配置, 進行動態分流 Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> kafkaHbaseStreams = dynamicSplit(connectedStreams); // kafkaHbaseStreams.f0.print("kafka"); // kafkaHbaseStreams.f1.print("hbase"); //5.維度表寫入到hbase sendToHbase(kafkaHbaseStreams.f1); //6.事實表寫入到kafka sendToKafka(kafkaHbaseStreams.f0); } private void sendToKafka(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> stream) { stream.addSink(FlinkSinkUtil.getKafkaSink()); } private void sendToHbase(DataStream<Tuple2<JSONObject, TableProcess>> stream) { /** * 向hbase(Phoenix)寫入數據的時候, 表不會自動創建 * 1. 先創建表 動態創建 * 2. 再寫入 * */ stream.keyBy(t -> t.f1.getSink_table()) //按照配置表中的Sink_table分組 .addSink(FlinkSinkUtil.getHbaseSink()); } private Tuple2<SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>>, DataStream<Tuple2<JSONObject, TableProcess>>> dynamicSplit(SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectedStreams) { //側輸出流 OutputTag<Tuple2<JSONObject, TableProcess>> hbaseTag = new OutputTag<Tuple2<JSONObject, TableProcess>>("hbase") { }; SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> kafkaStream = connectedStreams.process(new ProcessFunction<Tuple2<JSONObject, TableProcess>, Tuple2<JSONObject, TableProcess>>() { @Override public void processElement(Tuple2<JSONObject, TableProcess> value, Context ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception { //只取出JSONObject中的data數據,相當於做了一次過濾,我們只需要data數據 Tuple2<JSONObject, TableProcess> data = Tuple2.of(value.f0.getJSONObject("data"), value.f1); //其實這個地方應該根據sink_cloumns的值進行一個過濾, 只保留需要sink的字段 filterColumns(data); /** * 從TableProcess配置中獲取每條數據應該去往哪里:getSink_type * hbase * kafka * */ String sink_type = value.f1.getSink_type(); if (TableProcess.SINK_TYPE_KAFKA.equals(sink_type)) { //如果這條數據中的配置顯示是kafka,則將數據發往kafka(使用主流) //事實數據較多,使用主流發往kafka out.collect(data); } else if (TableProcess.SINK_TYPE_HBASE.equals(sink_type)) { //如果這條數據中的配置顯示是hbase,則將數據發往hbase(使用側輸出流) //因為維度數據較少,故使用側輸出流發往hbase ctx.output(hbaseTag, data); } } private void filterColumns(Tuple2<JSONObject, TableProcess> data) { JSONObject jsonObject = data.f0; //將配置表中的配個字段切分開來,放到一個List集合中 /* id,activity_name,activity_type,activity_desc,start_time,end_time,create_time id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level */ List<String> columns = Arrays.asList(data.f1.getSink_columns().split(",")); //如果columns集合中沒有對應的key值,那么JSONObject中的這條數據就刪除它 jsonObject.keySet().removeIf(key -> !columns.contains(key)); } }); //將側輸出流轉換為DataStream DataStream<Tuple2<JSONObject, TableProcess>> hbaseStream = kafkaStream.getSideOutput(hbaseTag); return Tuple2.of(kafkaStream,hbaseStream); } private SingleOutputStreamOperator<Tuple2<JSONObject, TableProcess>> connectStreams(SingleOutputStreamOperator<JSONObject> etledStream, SingleOutputStreamOperator<TableProcess> tableProcessStream) { MapStateDescriptor<String, TableProcess> tpStateDesc = new MapStateDescriptor<>("tpStateDesc", String.class, TableProcess.class); /* 動態分流 目標: 應該得到一個新的流, 新的流存儲的數據類型應該是一個二維元組 <JSONObject, TableProcess> 碰到一條數據流中的數據, 找一個TableProcess key: source_table:operate_type value: TableProcess */ //1.將配置流做成廣播流 BroadcastStream<TableProcess> tpBroadcastStream = tableProcessStream.broadcast(tpStateDesc); //2.廣播流與數據流進行connect return etledStream.keyBy(obj -> obj.getString("table")) //以table分組,然后將每個table與廣播流connect .connect(tpBroadcastStream) .process(new KeyedBroadcastProcessFunction<String, JSONObject, TableProcess, Tuple2<JSONObject,TableProcess>>() { @Override public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<Tuple2<JSONObject,TableProcess>> out) throws Exception { //獲取廣播狀態,因為所有的配置信息都保存在了廣播狀態中 ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc); //拼接table:type作為key值 String key = value.getString("table") + ":" + value.getString("type").replaceAll("bootstrap-", ""); //根據在數據流中拼接出來的key值,從廣播狀態中取出對應的配置信息(即給每一條數據找一個TableProcess配置) TableProcess tableProcess = broadcastState.get(key); //如果tableProcess是null,證明這條數據不需要后面處理 if (tableProcess != null){ out.collect(Tuple2.of(value,tableProcess)); //將Json數據和配置數據返回(一個table對應一個配置信息) } } @Override public void processBroadcastElement(TableProcess value, Context ctx, Collector<Tuple2<JSONObject, TableProcess>> out) throws Exception { //把來的每條配置都寫入到廣播狀態中 BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(tpStateDesc); //從上下文環境中獲取廣播狀態 //拼接key,以保存到廣播狀態中 /* Source_table Operate_type activity_info insert activity_info update activity_rule insert activity_rule update activity_sku insert activity_sku update */ String key = value.getSource_table() + ":" + value.getOperate_type(); //一條記錄就是一個配置信息 broadcastState.put(key,value); } }); } private SingleOutputStreamOperator<TableProcess> readTableProcess(StreamExecutionEnvironment environment) { /** * 第一次讀取全部數據 * 以后監控mysql中這個配置表的數據的更新 * */ StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); tableEnvironment.executeSql("create table `table_process` (\n" + " `source_table` string,\n" + " `operate_type` string,\n" + " `sink_type` string,\n" + " `sink_table` string,\n" + " `sink_columns` string,\n" + " `sink_pk` string,\n" + " `sink_extend` string,\n" + " primary key (`source_table`,`operate_type`) not enforced\n" + ")with(" + " 'connector' = 'mysql-cdc', " + " 'hostname' = 'hadoop162', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = 'aaaaaa', " + " 'database-name' = 'flink_realtime', " + " 'table-name' = 'table_process', " + " 'debezium.snapshot.mode' = 'initial' " + ")" ); /** * initial: 啟動的時候會讀取表中所有的數據, 放在內存中, 全部數據讀取完成之后, 會使用binlog來監控mysql的變化 * never: 只用binlog來監控mysql的變化 */ Table table_process = tableEnvironment.from("table_process"); return tableEnvironment .toRetractStream(table_process, TableProcess.class) //將table轉化為可以新增和變化的dataStream .filter(t -> t.f0) //過濾出變化的數據 .map(t -> t.f1); //返回數據:TableProcess } private SingleOutputStreamOperator<JSONObject> etlDataStream(DataStreamSource<String> dataStream) { return dataStream .map(JSON::parseObject) //將流中的數據轉為JSON格式 .filter(obj -> obj.getString("database") != null && obj.getString("table") != null && obj.getString("type") != null && (obj.getString("type").contains("insert") || "update".equals(obj.getString("type"))) && obj.getString("data") != null && obj.getString("data").length() > 10 ); } }
(3)啟動DwdDbApp
(4)生產業務數據,模擬新增
cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar
(5)查看Kafka
3)打包部署flink-realtime到Linux上
(1)使用maven打包(打包之前先clear一下,並且停止idea上的DwdDbApp)
(2)將打包好的flink-realtime-1.0-SNAPSHOT.jar上傳至/opt/module/applog
(3)啟動Yarn-Session(在此之前必須啟動Hadoop、ZK、Kafka、Hbase)
/opt/module/flink-yarn/bin/yarn-session.sh -d
(4)提交運行flink-realtime-1.0-SNAPSHOT.jar到Yarn上,編寫一個腳本來提交,避免重復性工作
vim /home/atguigu/bin/realtime.sh
#!/bin/bash flink=/opt/module/flink-yarn/bin/flink jar=/opt/module/applog/flink-realtime-1.0-SNAPSHOT.jar apps=( com.yuange.flinkrealtime.app.dwd.DwdLogApp com.yuange.flinkrealtime.app.dwd.DwdDbApp ) for app in ${apps[*]} ; do $flink run -d -c $app $jar done
chmod +x /home/atguigu/bin/realtime.sh
/home/atguigu/bin/realtime.sh
(5)查看程序運行情況,發現出現異常信息
(6)原因是依賴沖突,flink-realtime-1.0-SNAPSHOT.jar中的依賴和Linux中Flink的依賴存在相同的依賴,將其中一放的依賴去除即可
cd /opt/module/flink-yarn/lib
rm -rf flink-connector-jdbc_2.12-1.13.1.jar flink-connector-kafka_2.12-1.13.1.jar mysql-connector-java-5.1.27-bin.jar
(7)重啟yarn-session,然后再次提交flink-realtime-1.0-SNAPSHOT.jar至yarn上運行
(8)再次查看,發現沒有異常日志
(9)將hbase中的表刪除
(10)將kafka中的所有主題刪除
(11)啟動日志服務器(測試日志能否到達Kafka,在此之前必須保證nginx已啟動)
log-lg.sh start
(12)啟動maxwell,然后使用maxwell-bootstrap導入數據
vim /home/atguigu/bin/maxwell.sh
#!/bin/bash maxwell_home=/opt/module/maxwell-1.27.1 case $1 in start) echo "========== $host 啟動maxwell =========" source /etc/profile $maxwell_home/bin/maxwell --config $maxwell_home/config.properties --daemon ;; stop) echo "========== $host停止 maxwell =========" source /etc/profile jps | awk '/Maxwell/ {print $1}' | xargs kill ;; *) echo "你啟動的姿勢不對" echo " start 啟動maxwell集群" echo " stop 停止maxwwll集群" ;;
chmod +x /home/atguigu/bin/maxwell.sh
maxwell.sh start
/opt/module/maxwell-1.27.1/bin/maxwell-bootstrap --user maxwell --password aaaaaa --host hadoop162 --database flinkdb --table user_info --client_id maxwell_1
(13)查看Hbase是否有user_info表,且是否有數據
(14)生產日志數據,查看Kafka中是否有數據
cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar
(15)生產業務數據,查看Kafka中是否有數據
cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar