kafka-重復消費-2


場景:

kafka先批量拉取消息,完了將拉下來的消息逐條消費,假如此次共拉取40條消息,但在處理第31條時,線程被停掉,就會導致已消費消息offet不會被提交,接下來已經被消費的30條記錄還會被重復消費,這就是kafka重復消費的另一場景;

解決思路:

解決此類重復消費的方式:將能夠唯一標識消息的信息存儲在其他系統,比如redis,什么能夠唯一標識消息呢?就是consumergroup+topic+partition+offset,更准確的應該是consumergroup+"_"+topic+"_"+partition+"_"+offset組成的key,value可以是處理時間存放在redis中,每次處理kafka消息時先從redis中根據key獲取value,如果value為空,則表明該消息是第一次被消費,不為空則表示時已經被消費過的消息;

代碼如下:

 1     private void handleRecord(String recordValue,long offset,int partition,String topic) {
 2 
 3         StringBuilder redisKey = new StringBuilder();
 4         redisKey.append(consumergroup).append("_").append(topic).append("_").append(partition).append("_").append(offset);
 5         logger.info("redisKey:{}",redisKey.toString());
 6         String consumeDate = (String) redisCacheUtil.get(redisKey.toString());
 7         if (StringUtils.isNotEmpty(consumeDate)) {
 8             logger.info("重復消費,跳過!redisKey:{},recordValue:{},消費時間:{}",redisKey,recordValue,consumeDate);
 9             return;
10         }
11         redisCacheUtil.set(redisKey.toString(),DateUtils.dateToString());
12         //設置過期時間,要稍大於kafka消息保存時間
13         redisCacheUtil.expire(redisKey.toString(),ComConstant.REDIS_EXPIRE_TIME_8_DAYS);
14         
15         //核心業務邏輯
16     }
17     
18     private void consumeMessage() {
19         logger.debug("consumeMessage() -- BEGEN");
20         ConsumerRecords<String, String> records = consumer.poll(10000);
21 
22         logger.info("獲取未消費記錄數:{}", records.count());
23         for (ConsumerRecord<String, String> record : records) {
24             logger.info("消費消息:{}", record);
25             try {
26                handleRecord(record.value(),record.offset(),record.partition(),record.topic());
27             }catch (Exception e){
28                 LoggerUtils.error(logger, e, "kafka處理消息失敗!");
29                 //失敗時的處理邏輯
30             }
31         }
32         if (records.count() > 0) {
33             logger.info("更新offset");
34             consumer.commitSync();
35         }
36         logger.debug("consumeMessage() -- END");
37     }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM