Flink:識別新老訪客,Kafka消費數據到不同主題


前言:接上一篇

1.需求描述:識別新老用戶

本身客戶端業務有新老用戶的標識,但是不夠准確,需要用實時計算再次確認(不涉及業務操作,只是單純的做個狀態確認)

2.利用側輸出流實現數據拆分

根據日志數據內容,將日志數據分成3類,頁面日志、啟動日志和曝光日志。頁面日志輸出到主流,啟動日志輸出到啟動側輸出流,曝光日志輸出到曝光日志側輸出流。

3.將不同流的數據推送下游的Kafka的不同topic中

代碼如下:

1.在MyKafkaUtil中添加【獲取Kafka消費者的方法】(讀)

注意:此方法是在上一篇的MyKafkaUtil中添加
/*
* * 獲取KafkaSource的方法 * * @param topic 主題 * @param groupId 消費者組 */ public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) { //給配置信息對象添加配置項 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); //獲取KafkaSource return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties); } }

2.Flink調用工具類讀取數據的主程序

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lxz.gamll20210909.util.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;

public class BaseLogApp {

    public static void main(String[] args) throws Exception {

        //1.獲取執行環境,設置並行度,開啟CK,設置狀態后端(HDFS)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //為Kafka主題的分區數
        env.setParallelism(1);
        //1.1 設置狀態后端
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop201:8020/gmall/dwd_log/ck"));
//        //1.2 開啟CK
//        env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(60000L);

        //修改用戶名
        System.setProperty("HADOOP_USER_NAME", "root");

        //2.讀取Kafka ods_base_log 主題數據
        String topic = "ods_base_log";
        String groupId = "ods_dwd_base_log_app";
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //3.將每行數據轉換為JsonObject
//        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSONObject::parseObject);
        OutputTag<String> dirty = new OutputTag<String>("DirtyData") {
        };
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    ctx.output(dirty, value);
                }
            }
        });

//        //打印測試
//        jsonObjDS.print(">>>>>>>>>");
        //4.按照Mid分組
//        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(data -> data.getJSONObject("common").getString("mid"));
        SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlag = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
                .process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
                    // 定義狀態
                    private ValueState<String> isNewState;

                    // 初始化狀態
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        isNewState = getRuntimeContext().getState(new ValueStateDescriptor<String>("isNew-state", String.class));
                    }

                    @Override
                    public void processElement(JSONObject jsonObject, Context ctx, Collector<JSONObject> out) throws Exception {
                        // 取出數據中“is_new”字段
                        String isNew = jsonObject.getJSONObject("common").getString("is_new");
                        // 如果isNew為1,則需要繼續校驗
                        if ("1".equals(isNew)) {
                            //取出狀態中的數據,並判斷是否為null
                            if (isNewState.value() != null) {
                                //說明當前mid不是新用戶,修改is_new的值
                                jsonObject.getJSONObject("common").put("is_new", "0");
                            } else {
                                // 說明為真正的新用戶
                                isNewState.update("0");
                            }
                        }
                        // 輸出數據
                        out.collect(jsonObject);
                    }
                });

        jsonObjWithNewFlag.print(">>>>>>>>>");
        jsonObjDS.getSideOutput(dirty).print("Dirty>>>>>>>>>>");
     //執行任務
        env.execute();
    }
}

3.在服務器上開啟生產者假造數據,便於一會查看IDEA客戶端的數據輸出

bin/kafka-console-producer.sh --broker-list hadoop201:9092 --topic ods_base_log

4.啟動IDEA主程序並在Kafka生產者中插入數據(最好插入3次,模擬用戶二次進入和臟數據進入場景)

{"common":{"ar":"310000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone X","mid":"mid_16","os":"iOS 13.3.1","uid":"12","vc":"v2.1.132"},"start":{"entry":"icon","loading_time":16516,"open_ad_id":15,"open_ad_ms":2419,"open_ad_skip_ms":0},"ts":1631197168000}

5.觀察IDEA客戶端的數據輸出

 

 6.結果可知,第一次is_new=1,數據插入模擬用戶第一次訪問,當再次插入數據模擬用戶第二次訪問,則主程序is_new變成0,標識用戶此次訪問不作為新用戶記錄,當第三次插入數據並刪除“}”,偽造臟數據時,IDEA客戶端識別數據時“dirty”並打印輸出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM