java使用flink集成mybatis每五分鍾實時計算小時內用戶行為數據
目前在學習flink,寫了一個比較常見的需求:每五分鍾統計前一小時用戶點擊最多的商品,並且把源數據存入mysql.
實現思路:
使用滑動窗口 size 1h,間隔5分鍾,使用商品作為keyby的分組,過濾掉不是點擊的數據,aggregate函數來增量計算每一個商品被點擊的數量 使用ProcessWindowFunction方法組成二元組<商品id,點擊數量>最后存入redis zset類型中,以商品id為key,點擊次數為score.
異步的將所有用戶行為數據和遲到數據存入mysql
下面是我的代碼
用戶行為實體類:
@Data
public class UserBehavingInfo {
private String userNo;
/**
* 用戶行為
*/
private String behavior;
/**
* 行為商品
*/
private String operatedGoods;
/**
* 行為發生時間
*/
private Long time;
}
main函數
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.219.128:9092");
properties.setProperty("group.id", "event");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(("eventDetails"), new SimpleStringSchema(), properties);
//flink會自動保存kafka 偏移量作為狀態
// start from the earliest record possible
kafkaConsumer.setStartFromGroupOffsets();
// 接收kafka數據,轉為UserBehavingInfo 對象
SingleOutputStreamOperator<UserBehavingInfo> input =
env.addSource(kafkaConsumer)
.map(string -> JSON.parseObject(string, UserBehavingInfo.class)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBehavingInfo>(Time.seconds(5)) {
@Override
public long extractTimestamp(UserBehavingInfo UserBehavingInfo) {
System.out.println("mark:" + (UserBehavingInfo.getTime() - 5*1000L));
return UserBehavingInfo.getTime();
}
}).setParallelism(1);
//將用戶行為數據異步插入mysql
// 異步IO 獲取mysql數據, timeout 時間 1s,容量 10(超過10個請求,會反壓上游節點) unorderedWait返回結果無順序(如果是事件時間 實則會根據watermark排序) orderedWait返回結果有序(fifo)
//超過10個請求,會反壓上游節點 反壓機制來抑制上游數據的攝入
AsyncDataStream.unorderedWait(input, new AsyncInsertUserBehaviorToMysql(), 1000, TimeUnit.MICROSECONDS, 10);
SingleOutputStreamOperator<UserBehavingInfo> filterClick = input.filter(new FilterFunction<UserBehavingInfo>() {
@Override
public boolean filter(UserBehavingInfo userBehavingInfo) throws Exception {
return "click".equals(userBehavingInfo.getBehavior());
}
});
//創建遲到數據側輸出流
OutputTag<UserBehavingInfo> lateOutputUserBehavior = new OutputTag<UserBehavingInfo>("late-userBehavior-data"){};
SingleOutputStreamOperator<Tuple2<String, Integer>> aggregateUserClick = filterClick
.keyBy(new KeySelector<UserBehavingInfo, String>() {
@Override
public String getKey(UserBehavingInfo userBehavingInfo) throws Exception {
return userBehavingInfo.getOperatedGoods();
}
})
.window(SlidingEventTimeWindows.of(
// Time.hours(1), Time.minutes(5)
Time.seconds(10),Time.seconds(5)
))
.allowedLateness(Time.hours(1))
.sideOutputLateData(lateOutputUserBehavior)
//增量計算用戶點擊數量
.aggregate(new UserBehavorCountAggregateUtils(), new UserBehavorCountWindowFunction());
aggregateUserClick.print();
//遲到數據 遲到數據不會觸發窗口 存入數據庫
AsyncDataStream.unorderedWait(aggregateUserClick.getSideOutput(lateOutputUserBehavior), new AsyncInsertUserBehaviorToMysql(), 1000, TimeUnit.MICROSECONDS, 10);
//輸入到redis中 rank:click
FlinkJedisPoolConfig redis = new FlinkJedisPoolConfig.Builder().setDatabase(1).setHost("192.168.219.128").setPort(6379).setPassword("redis").build();
aggregateUserClick.addSink(new RedisSink<>(redis,new UserBehaviorRedisMapper()));
env.execute("userClickBehaviorRank");
}
使用阿里巴巴提供的 異步io訪問數據庫,將用戶行為數據存入數據庫
/**
* 異步將用戶行為數據插入mysql
*/
public class AsyncInsertUserBehaviorToMysql extends RichAsyncFunction<UserBehavingInfo, Integer> {
Logger logger = LoggerFactory.getLogger(AsyncInsertUserBehaviorToMysql.class);
//創建mybatis 會話工廠
private transient SqlSession sqlSession ;
/**
* open 方法中初始化鏈接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("async function for mysql java open ..."+Thread.currentThread().getName());
super.open(parameters);
sqlSession = MybatisSessionFactory.getSqlSessionFactory().openSession();
}
/**
* use asyncUser.getId async get asyncUser phone
*
* @param asyncUser
* @param resultFuture
* @throws Exception
*/
@Override
public void asyncInvoke(UserBehavingInfo asyncUser, ResultFuture<Integer> resultFuture) throws Exception {
Integer insertNum = 0;
try{
UserBehaviorDetailsMapper mapper = sqlSession.getMapper(UserBehaviorDetailsMapper.class);
insertNum = mapper.insertUserBehavior(asyncUser);
sqlSession.commit();
System.out.println("插入數據庫"+insertNum);
}catch (Exception throwable){
sqlSession.rollback();
System.out.println("異常回滾"+ throwable);
}finally {
// 一定要記得放回 resultFuture,不然數據全部是timeout 的
resultFuture.complete(Collections.singletonList(insertNum));
}
}
/**
* close function
*
* @throws Exception
*/
@Override
public void close() throws Exception {
logger.info("async function for mysql java close ...");
//關閉會話,釋放資源
sqlSession.close();
super.close();
}
}
增量計算用戶點擊行為數量
public class UserBehavorCountAggregateUtils implements AggregateFunction<UserBehavingInfo, Integer, Integer> {
@Override
public Integer createAccumulator() {
return 0;
}
//一條數據執行一次
@Override
public Integer add(UserBehavingInfo UserBehavingInfo, Integer integer) {
return integer + 1;
}
//窗口結束執行一次
@Override
public Integer getResult(Integer integer) {
return integer;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return integer+acc1;
}
}
窗口方法 組成二元組
public class UserBehavorCountWindowFunction extends ProcessWindowFunction<Integer, Tuple2<String, Integer>, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Integer> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
collector.collect(new Tuple2<String, Integer>(key,iterable.iterator().next()));
}
}
將商品點擊信息二元組存入redis zset類型
public class UserBehaviorRedisMapper implements RedisMapper<Tuple2<String, Integer>> {
//設置redis 命令
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.ZADD,"rank:click");
}
//從數據中獲取key
@Override
public String getKeyFromData(Tuple2<String, Integer> stringEventDetailsTuple2) {
return stringEventDetailsTuple2.f0;
}
//從數據中獲取value
@Override
public String getValueFromData(Tuple2<String, Integer> stringEventDetailsTuple2) {
return String.valueOf(stringEventDetailsTuple2.f1);
}
}
這是我flink集成mybatis的配置
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<settings>
<setting name="defaultExecutorType" value="BATCH" />
</settings>
<environments default="development">
<environment id="development">
<transactionManager type="JDBC" />
<dataSource type="config.DruidDataSourceFactory">
<property name="driver" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://127.0.0.1:3306/risk_control?useSSL=false&characterEncoding=utf8&serverTimezone=GMT%28&allowPublicKeyRetrieval=true" />
<property name="username" value="root" />
<property name="password" value="root" />
</dataSource>
</environment>
</environments>
<mappers>
<mapper resource="mapper/EventDetailsMapper.xml" />
<mapper resource="mapper/UserBehaviorDetailsMapper.xml" />
</mappers>
</configuration>