flink-----實時項目---day04-------1. 案例:統計點擊、參與某個活動的人數和次數 2. 活動指標多維度統計(自定義redisSink)


1. 案例

用戶ID,活動ID,時間,事件類型,省份
u001,A1,2019-09-02 10:10:11,1,北京市
u001,A1,2019-09-02 14:10:11,1,北京市
u001,A1,2019-09-02 14:10:11,2,北京市
u002,A1,2019-09-02 14:10:11,1,北京市
u002,A2,2019-09-02 14:10:11,1,北京市
u002,A2,2019-09-02 15:10:11,1,北京市
u002,A2,2019-09-02 15:10:11,2,北京市

事件類型:
  0:曝光
  1:點擊
  2:參與

需求:統計點擊、參與某個活動的人數和次數
  • 方案一:使用ValueState結合HashSet實現

 具體代碼如下

ActivityCountAdv1

package cn._51doit.flink.day08;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashSet;

public class ActivityCountAdv1 {
    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
        // 對數據進行切分整理
        SingleOutputStreamOperator<Tuple5<String, String, String, Integer, String>> tpDataStream = lines.map(new MapFunction<String, Tuple5<String, String, String, Integer, String>>() {
            @Override
            public Tuple5<String, String, String, Integer, String> map(String line) throws Exception {
                String[] fields = line.split(",");
                String uid = fields[0];
                String activityID = fields[1];
                String date = fields[2];
                Integer type = Integer.parseInt(fields[3]);
                String prince = fields[4];
                return Tuple5.of(uid, activityID, date, type, prince);
            }
        });
        // 按照活動ID和事件類型分組
        KeyedStream<Tuple5<String, String, String, Integer, String>, Tuple> keyed = tpDataStream.keyBy(1, 3);
        
        keyed.process(new KeyedProcessFunction<Tuple, Tuple5<String, String, String, Integer, String>, Tuple4<String, Integer, Integer, Integer>>() {
            //保存去重后用戶ID的HashSet
            private transient ValueState<HashSet<String>> uidState;

            //保存次數的Integer類型
            private transient ValueState<Integer> countState;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 定義一個狀態描述器
                ValueStateDescriptor<HashSet<String>> stateDescriptor1 = new ValueStateDescriptor<HashSet<String>>(
                        "uid-state",
                        TypeInformation.of(new TypeHint<HashSet<String>>(){})
                );
                // 定義一個狀態描述器
                ValueStateDescriptor<Integer> stateDescriptor2 = new ValueStateDescriptor<Integer>(
                        "count-state",
                        Integer.class
                );
                // 獲取狀態
                uidState = getRuntimeContext().getState(stateDescriptor1);
                countState = getRuntimeContext().getState(stateDescriptor2);
            }

            @Override
            public void processElement(Tuple5<String, String, String, Integer, String> value, Context ctx, Collector<Tuple4<String, Integer, Integer, Integer>> out) throws Exception {
                String uid = value.f0;
                String aid = value.f1;
                Integer type = value.f3;
                //使用HashSet進行判斷去重,更新uidState
                HashSet<String> hashSet = uidState.value();
                if(hashSet == null){
                    hashSet = new HashSet<>();
                }
                hashSet.add(uid);
                uidState.update(hashSet);
                // 計算人數
                Integer count = countState.value();
                if(count == null) {
                    count = 0;
                }
                count += 1;
                countState.update(count);
                out.collect(Tuple4.of(aid,type,hashSet.size(), count));
            }
        }).print();
        env.execute();
    }
}
View Code

  如果使用HashSet去重,用戶實例較大,會大量消耗資源,導致性能變低,甚至內存溢出

  • 方案二:改進,使用BloomFilter存儲用戶的ID,BloomFilter可以判斷用戶一定不存在,使用的內存極少。但是使用BloomFilter沒有計數器,就必須額外定義一個狀態,存儲去重的人數

ActivityCountAdv2

package cn._51doit.flink.day08;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;

public class ActivityCountAdv2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //u001,A1,2019-09-02 10:10:11,1,北京市
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //對數據進行切分整理
        SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> tpDataStream = lines.map(new MapFunction<String, Tuple5<String, String, String, String, String>>() {
            @Override
            public Tuple5<String, String, String, String, String> map(String line) throws Exception {
                String[] fields = line.split(",");
                String uid = fields[0];
                String aid = fields[1];
                String time = fields[2];
                String type = fields[3];
                String province = fields[4];
                return Tuple5.of(uid, aid, time, type, province);
            }
        });

        //按照活動ID和事件類型分組
        KeyedStream<Tuple5<String, String, String, String, String>, Tuple> keyed = tpDataStream.keyBy(1, 3);

        keyed.process(new KeyedProcessFunction<Tuple, Tuple5<String, String, String, String, String>, Tuple4<String, String, Integer, Integer>>() {

            //保存去重后用戶ID的HashSet
            private transient ValueState<BloomFilter> uidState;

            //保存用戶ID去重的次數的Integer類型
            private transient ValueState<Integer> uidCountState;

            //保存次數的Integer類型(未去重的)
            private transient ValueState<Integer> countState;

            @Override
            public void open(Configuration parameters) throws Exception {
                //定義一個狀態描述器
                ValueStateDescriptor<BloomFilter> stateDescriptor1 = new ValueStateDescriptor<BloomFilter>(
                        "uid-state",
                        TypeInformation.of(new TypeHint<BloomFilter>(){})
                );

                //定義一個狀態描述器
                ValueStateDescriptor<Integer> stateDescriptor2 = new ValueStateDescriptor<Integer>(
                        "count-state",
                        Integer.class
                );

                //定義一個狀態描述器
                ValueStateDescriptor<Integer> stateDescriptor3 = new ValueStateDescriptor<Integer>(
                        "uid-count-state",
                        Integer.class
                );
                //獲取狀態
                //獲取狀態
                uidState = getRuntimeContext().getState(stateDescriptor1);
                countState = getRuntimeContext().getState(stateDescriptor2);
                uidCountState = getRuntimeContext().getState(stateDescriptor3);
            }

            @Override
            public void processElement(Tuple5<String, String, String, String, String> value, Context ctx, Collector<Tuple4<String, String, Integer, Integer>> out) throws Exception {
                String uid = value.f0;
                String aid = value.f1;
                String type = value.f3;
                //使用HashSet進行判斷去重
                BloomFilter bloomFilter = uidState.value();
                Integer uidCount = uidCountState.value(); //人數
                Integer count = countState.value(); //次數
                if(count == null) {
                    count = 0;
                }
                if(bloomFilter == null) {
                    bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000);
                    uidCount = 0;
                }
                if(!bloomFilter.mightContain(uid)) {
                    bloomFilter.put(uid); //添加到BloomFilter中
                    uidCount += 1;
                }
                count += 1;
                countState.update(count);
                uidState.update(bloomFilter);
                uidCountState.update(uidCount);
                out.collect(Tuple4.of(aid, type, uidCount, count));
            }
        }).print();

        env.execute();

    }
}
View Code

 

 2. 活動指標多維度統計

  此處要進行多次key操作(一中維度就需要keyBy一次),相當繁瑣。此處是通過將數據存入redis,所以不需要使用flink中的state,具體見代碼

ActivityCountWithMultiDimension

package cn._51doit.flink.day08;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import sun.awt.geom.AreaOp;

public class ActivityCountWithMultiDimension {

    public static void main(String[] args) throws Exception{

        ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameters);

        //u001,A1,2019-09-02 10:10:11,1,北京市
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<ActivityBean> beanStream = lines.map(new MapFunction<String, ActivityBean>() {

            @Override
            public ActivityBean map(String line) throws Exception {
                String[] fields = line.split(",");
                String uid = fields[0];
                String aid = fields[1];
                String date = fields[2].split(" ")[0];
                String type = fields[3];
                String province = fields[4];
                return ActivityBean.of(uid, aid, date, type, province);
            }
        });

        SingleOutputStreamOperator<ActivityBean> res1 = beanStream.keyBy("aid", "type").sum("count");

        SingleOutputStreamOperator<ActivityBean> res2 = beanStream.keyBy("aid", "type", "date").sum("count");

        SingleOutputStreamOperator<ActivityBean> res3 = beanStream.keyBy("aid", "type", "date", "province").sum("count");

        res1.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(ActivityBean value) throws Exception {
                return Tuple3.of(Constant.ACTIVITY_COUNT +"-"+ value.aid,  value.type, value.count.toString());
            }
        }).addSink(new MyRedisSink());

        res2.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(ActivityBean value) throws Exception {
                return Tuple3.of(Constant.DAILY_ACTIVITY_COUNT + "-" + value.aid + "-" + value.date, value.type, value.count.toString());
            }
        }).addSink(new MyRedisSink());

        res3.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(ActivityBean value) throws Exception {
                return Tuple3.of(Constant.PROVINCE_DAILY_ACTIVITY_COUNT + "-" + value.aid + "-" + value.date + "-" + value.province, value.type, value.count.toString());
            }
        }).addSink(new MyRedisSink());

        env.execute();
    }
}
View Code

Constant

package cn._51doit.flink.day08;

public class Constant {

    public static final String ACTIVITY_COUNT = "ACTIVITY_COUNT";

    public static final String DAILY_ACTIVITY_COUNT = "DAILY_ACTIVITY_COUNT";

    public static final String PROVINCE_DAILY_ACTIVITY_COUNT = "PROVINCE_DAILY_ACTIVITY_COUNT";
}
View Code

ActivityBean

package cn._51doit.flink.day08;

public class ActivityBean {

    public String uid;

    public String aid;

    public String date;

    public String type;

    public String province;

    public Long count = 1L;

    public ActivityBean() {}

    public ActivityBean(String uid, String aid, String date, String type, String province) {
        this.uid = uid;
        this.aid = aid;
        this.date = date;
        this.type = type;
        this.province = province;
    }

    public static ActivityBean of(String uid, String aid, String date, String type, String province) {
        return new ActivityBean(uid, aid, date, type, province);
    }

    @Override
    public String toString() {
        return "ActivityBean{" +
                "uid='" + uid + '\'' +
                ", aid='" + aid + '\'' +
                ", date='" + date + '\'' +
                ", type='" + type + '\'' +
                ", province='" + province + '\'' +
                '}';
    }
}
View Code

MyRedisSink

package cn._51doit.flink.day08;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;


public class MyRedisSink extends RichSinkFunction<Tuple3<String, String, String>> {

    private transient Jedis jedis;

    @Override
    public void open(Configuration parameters) throws Exception {
       ParameterTool params = (ParameterTool) getRuntimeContext()
               .getExecutionConfig()
               .getGlobalJobParameters();
        String host = params.getRequired("redis.host");
        String password = params.getRequired("redis.password");
        int port = params.getInt("redis.port", 6379);
        int db = params.getInt("redis.db", 0);
        Jedis jedis = new Jedis(host, port);
        jedis.auth(password);
        jedis.select(db);
        this.jedis = jedis;
    }

    @Override
    public void invoke(Tuple3<String, String, String> value, Context context) throws Exception {
        if (!jedis.isConnected()) {
            jedis.connect();
        }
        jedis.hset(value.f0, value.f1, value.f2);
    }

    @Override
    public void close() throws Exception {
        jedis.close();
    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM