java使用flink集成mybatis每五分鍾實時計算小時內用戶行為數據


java使用flink集成mybatis每五分鍾實時計算小時內用戶行為數據

目前在學習flink,寫了一個比較常見的需求:每五分鍾統計前一小時用戶點擊最多的商品,並且把源數據存入mysql.

實現思路:

使用滑動窗口 size 1h,間隔5分鍾,使用商品作為keyby的分組,過濾掉不是點擊的數據,aggregate函數來增量計算每一個商品被點擊的數量 使用ProcessWindowFunction方法組成二元組<商品id,點擊數量>最后存入redis zset類型中,以商品id為key,點擊次數為score.

異步的將所有用戶行為數據和遲到數據存入mysql

下面是我的代碼

用戶行為實體類:

@Data
public class UserBehavingInfo {

    private String userNo;

    /**
     * 用戶行為
     */
    private String behavior;

    /**
     * 行為商品
     */
    private String operatedGoods;

    /**
     * 行為發生時間
     */
    private Long time;
}

main函數

public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //事件時間
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.219.128:9092");
        properties.setProperty("group.id", "event");
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(("eventDetails"), new SimpleStringSchema(), properties);
        //flink會自動保存kafka 偏移量作為狀態
        // start from the earliest record possible
        kafkaConsumer.setStartFromGroupOffsets();

        // 接收kafka數據,轉為UserBehavingInfo 對象
        SingleOutputStreamOperator<UserBehavingInfo> input =
                env.addSource(kafkaConsumer)
                        .map(string -> JSON.parseObject(string, UserBehavingInfo.class)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBehavingInfo>(Time.seconds(5)) {
                    @Override
                    public long extractTimestamp(UserBehavingInfo UserBehavingInfo) {
                        System.out.println("mark:" + (UserBehavingInfo.getTime() - 5*1000L));
                        return UserBehavingInfo.getTime();
                    }
                }).setParallelism(1);
        //將用戶行為數據異步插入mysql
        // 異步IO 獲取mysql數據, timeout 時間 1s,容量 10(超過10個請求,會反壓上游節點) unorderedWait返回結果無順序(如果是事件時間 實則會根據watermark排序)  orderedWait返回結果有序(fifo)
        //超過10個請求,會反壓上游節點  反壓機制來抑制上游數據的攝入
        AsyncDataStream.unorderedWait(input, new AsyncInsertUserBehaviorToMysql(), 1000, TimeUnit.MICROSECONDS, 10);

        SingleOutputStreamOperator<UserBehavingInfo> filterClick = input.filter(new FilterFunction<UserBehavingInfo>() {
            @Override
            public boolean filter(UserBehavingInfo userBehavingInfo) throws Exception {
                return "click".equals(userBehavingInfo.getBehavior());
            }
        });

        //創建遲到數據側輸出流
        OutputTag<UserBehavingInfo> lateOutputUserBehavior = new OutputTag<UserBehavingInfo>("late-userBehavior-data"){};
        SingleOutputStreamOperator<Tuple2<String, Integer>> aggregateUserClick = filterClick
                .keyBy(new KeySelector<UserBehavingInfo, String>() {
                    @Override
                    public String getKey(UserBehavingInfo userBehavingInfo) throws Exception {
                        return userBehavingInfo.getOperatedGoods();
                    }
                })
                .window(SlidingEventTimeWindows.of(
//                        Time.hours(1), Time.minutes(5)
                        Time.seconds(10),Time.seconds(5)
                ))
                .allowedLateness(Time.hours(1))
                .sideOutputLateData(lateOutputUserBehavior)
                //增量計算用戶點擊數量
                .aggregate(new UserBehavorCountAggregateUtils(), new UserBehavorCountWindowFunction());

        aggregateUserClick.print();
        //遲到數據   遲到數據不會觸發窗口  存入數據庫
        AsyncDataStream.unorderedWait(aggregateUserClick.getSideOutput(lateOutputUserBehavior), new AsyncInsertUserBehaviorToMysql(), 1000, TimeUnit.MICROSECONDS, 10);

    //輸入到redis中   rank:click
        FlinkJedisPoolConfig redis = new FlinkJedisPoolConfig.Builder().setDatabase(1).setHost("192.168.219.128").setPort(6379).setPassword("redis").build();
        aggregateUserClick.addSink(new RedisSink<>(redis,new UserBehaviorRedisMapper()));
        env.execute("userClickBehaviorRank");
    }

使用阿里巴巴提供的 異步io訪問數據庫,將用戶行為數據存入數據庫

/**
 * 異步將用戶行為數據插入mysql
 */
public class AsyncInsertUserBehaviorToMysql extends RichAsyncFunction<UserBehavingInfo, Integer> {

    Logger logger = LoggerFactory.getLogger(AsyncInsertUserBehaviorToMysql.class);

    //創建mybatis 會話工廠
    private transient SqlSession sqlSession ;
    /**
     * open 方法中初始化鏈接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("async function for mysql java open ..."+Thread.currentThread().getName());
        super.open(parameters);
        sqlSession =  MybatisSessionFactory.getSqlSessionFactory().openSession();
    }

    /**
     * use asyncUser.getId async get asyncUser phone
     *
     * @param asyncUser
     * @param resultFuture
     * @throws Exception
     */
    @Override
    public void asyncInvoke(UserBehavingInfo asyncUser, ResultFuture<Integer> resultFuture) throws Exception {
        Integer insertNum = 0;
        try{

            UserBehaviorDetailsMapper mapper = sqlSession.getMapper(UserBehaviorDetailsMapper.class);
             insertNum = mapper.insertUserBehavior(asyncUser);
            sqlSession.commit();
            System.out.println("插入數據庫"+insertNum);
        }catch (Exception throwable){
            sqlSession.rollback();
            System.out.println("異常回滾"+ throwable);
        }finally {
            // 一定要記得放回 resultFuture,不然數據全部是timeout 的
            resultFuture.complete(Collections.singletonList(insertNum));
        }
    }


    /**
     * close function
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        logger.info("async function for mysql java close ...");
        //關閉會話,釋放資源
        sqlSession.close();
        super.close();
    }


}

增量計算用戶點擊行為數量

public  class UserBehavorCountAggregateUtils implements AggregateFunction<UserBehavingInfo, Integer, Integer> {

    @Override
    public Integer createAccumulator() {
        return 0;
    }

    //一條數據執行一次
    @Override
    public Integer add(UserBehavingInfo UserBehavingInfo, Integer integer) {
        return integer + 1;
    }

    //窗口結束執行一次
    @Override
    public Integer getResult(Integer integer) {
        return integer;
    }

    @Override
    public Integer merge(Integer integer, Integer acc1) {
        return integer+acc1;
    }

}

窗口方法 組成二元組

public class UserBehavorCountWindowFunction extends ProcessWindowFunction<Integer, Tuple2<String, Integer>, String, TimeWindow> {
    @Override
    public void process(String key, Context context, Iterable<Integer> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
        collector.collect(new Tuple2<String, Integer>(key,iterable.iterator().next()));
    }

}

將商品點擊信息二元組存入redis zset類型

public  class UserBehaviorRedisMapper implements RedisMapper<Tuple2<String, Integer>> {

    //設置redis 命令
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.ZADD,"rank:click");
    }

    //從數據中獲取key
    @Override
    public String getKeyFromData(Tuple2<String, Integer> stringEventDetailsTuple2) {
        return stringEventDetailsTuple2.f0;
    }
    //從數據中獲取value
    @Override
    public String getValueFromData(Tuple2<String, Integer> stringEventDetailsTuple2) {
        return String.valueOf(stringEventDetailsTuple2.f1);
    }
}

這是我flink集成mybatis的配置

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <settings>
        <setting name="defaultExecutorType" value="BATCH" />
    </settings>
    <environments default="development">
        <environment id="development">
            <transactionManager type="JDBC" />
            <dataSource type="config.DruidDataSourceFactory">
                <property name="driver" value="com.mysql.jdbc.Driver" />
                <property name="url" value="jdbc:mysql://127.0.0.1:3306/risk_control?useSSL=false&amp;characterEncoding=utf8&amp;serverTimezone=GMT%28&amp;allowPublicKeyRetrieval=true" />
                <property name="username" value="root" />
                <property name="password" value="root" />
            </dataSource>
        </environment>
    </environments>
    <mappers>
        <mapper resource="mapper/EventDetailsMapper.xml" />
        <mapper resource="mapper/UserBehaviorDetailsMapper.xml" />
    </mappers>
</configuration>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM