前言:接上一篇
1.需求描述:識別新老用戶
本身客戶端業務有新老用戶的標識,但是不夠准確,需要用實時計算再次確認(不涉及業務操作,只是單純的做個狀態確認)
2.利用側輸出流實現數據拆分
根據日志數據內容,將日志數據分成3類,頁面日志、啟動日志和曝光日志。頁面日志輸出到主流,啟動日志輸出到啟動側輸出流,曝光日志輸出到曝光日志側輸出流。
3.將不同流的數據推送下游的Kafka的不同topic中
代碼如下:
1.在MyKafkaUtil中添加【獲取Kafka消費者的方法】(讀)
注意:此方法是在上一篇的MyKafkaUtil中添加
/** * 獲取KafkaSource的方法 * * @param topic 主題 * @param groupId 消費者組 */ public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) { //給配置信息對象添加配置項 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); //獲取KafkaSource return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties); } }
2.Flink調用工具類讀取數據的主程序
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.lxz.gamll20210909.util.MyKafkaUtil; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; public class BaseLogApp { public static void main(String[] args) throws Exception { //1.獲取執行環境,設置並行度,開啟CK,設置狀態后端(HDFS) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //為Kafka主題的分區數 env.setParallelism(1); //1.1 設置狀態后端 // env.setStateBackend(new FsStateBackend("hdfs://hadoop201:8020/gmall/dwd_log/ck")); // //1.2 開啟CK // env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setCheckpointTimeout(60000L); //修改用戶名 System.setProperty("HADOOP_USER_NAME", "root"); //2.讀取Kafka ods_base_log 主題數據 String topic = "ods_base_log"; String groupId = "ods_dwd_base_log_app"; FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId); DataStreamSource<String> kafkaDS = env.addSource(kafkaSource); //3.將每行數據轉換為JsonObject // SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSONObject::parseObject); OutputTag<String> dirty = new OutputTag<String>("DirtyData") { }; SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() { @Override public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception { try { JSONObject jsonObject = JSON.parseObject(value); out.collect(jsonObject); } catch (Exception e) { ctx.output(dirty, value); } } }); // //打印測試 // jsonObjDS.print(">>>>>>>>>"); //4.按照Mid分組 // KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(data -> data.getJSONObject("common").getString("mid")); SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlag = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid")) .process(new KeyedProcessFunction<String, JSONObject, JSONObject>() { // 定義狀態 private ValueState<String> isNewState; // 初始化狀態 @Override public void open(Configuration parameters) throws Exception { isNewState = getRuntimeContext().getState(new ValueStateDescriptor<String>("isNew-state", String.class)); } @Override public void processElement(JSONObject jsonObject, Context ctx, Collector<JSONObject> out) throws Exception { // 取出數據中“is_new”字段 String isNew = jsonObject.getJSONObject("common").getString("is_new"); // 如果isNew為1,則需要繼續校驗 if ("1".equals(isNew)) { //取出狀態中的數據,並判斷是否為null if (isNewState.value() != null) { //說明當前mid不是新用戶,修改is_new的值 jsonObject.getJSONObject("common").put("is_new", "0"); } else { // 說明為真正的新用戶 isNewState.update("0"); } } // 輸出數據 out.collect(jsonObject); } }); jsonObjWithNewFlag.print(">>>>>>>>>"); jsonObjDS.getSideOutput(dirty).print("Dirty>>>>>>>>>>"); //執行任務 env.execute(); } }
3.在服務器上開啟生產者假造數據,便於一會查看IDEA客戶端的數據輸出
bin/kafka-console-producer.sh --broker-list hadoop201:9092 --topic ods_base_log
4.啟動IDEA主程序並在Kafka生產者中插入數據(最好插入3次,模擬用戶二次進入和臟數據進入場景)
{"common":{"ar":"310000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone X","mid":"mid_16","os":"iOS 13.3.1","uid":"12","vc":"v2.1.132"},"start":{"entry":"icon","loading_time":16516,"open_ad_id":15,"open_ad_ms":2419,"open_ad_skip_ms":0},"ts":1631197168000}
5.觀察IDEA客戶端的數據輸出
6.結果可知,第一次is_new=1,數據插入模擬用戶第一次訪問,當再次插入數據模擬用戶第二次訪問,則主程序is_new變成0,標識用戶此次訪問不作為新用戶記錄,當第三次插入數據並刪除“}”,偽造臟數據時,IDEA客戶端識別數據時“dirty”並打印輸出。