場景:
kafka先批量拉取消息,完了將拉下來的消息逐條消費,假如此次共拉取40條消息,但在處理第31條時,線程被停掉,就會導致已消費消息offet不會被提交,接下來已經被消費的30條記錄還會被重復消費,這就是kafka重復消費的另一場景;
解決思路:
解決此類重復消費的方式:將能夠唯一標識消息的信息存儲在其他系統,比如redis,什么能夠唯一標識消息呢?就是consumergroup+topic+partition+offset,更准確的應該是consumergroup+"_"+topic+"_"+partition+"_"+offset組成的key,value可以是處理時間存放在redis中,每次處理kafka消息時先從redis中根據key獲取value,如果value為空,則表明該消息是第一次被消費,不為空則表示時已經被消費過的消息;
代碼如下:
1 private void handleRecord(String recordValue,long offset,int partition,String topic) { 2 3 StringBuilder redisKey = new StringBuilder(); 4 redisKey.append(consumergroup).append("_").append(topic).append("_").append(partition).append("_").append(offset); 5 logger.info("redisKey:{}",redisKey.toString()); 6 String consumeDate = (String) redisCacheUtil.get(redisKey.toString()); 7 if (StringUtils.isNotEmpty(consumeDate)) { 8 logger.info("重復消費,跳過!redisKey:{},recordValue:{},消費時間:{}",redisKey,recordValue,consumeDate); 9 return; 10 } 11 redisCacheUtil.set(redisKey.toString(),DateUtils.dateToString()); 12 //設置過期時間,要稍大於kafka消息保存時間 13 redisCacheUtil.expire(redisKey.toString(),ComConstant.REDIS_EXPIRE_TIME_8_DAYS); 14 15 //核心業務邏輯 16 } 17 18 private void consumeMessage() { 19 logger.debug("consumeMessage() -- BEGEN"); 20 ConsumerRecords<String, String> records = consumer.poll(10000); 21 22 logger.info("獲取未消費記錄數:{}", records.count()); 23 for (ConsumerRecord<String, String> record : records) { 24 logger.info("消費消息:{}", record); 25 try { 26 handleRecord(record.value(),record.offset(),record.partition(),record.topic()); 27 }catch (Exception e){ 28 LoggerUtils.error(logger, e, "kafka處理消息失敗!"); 29 //失敗時的處理邏輯 30 } 31 } 32 if (records.count() > 0) { 33 logger.info("更新offset"); 34 consumer.commitSync(); 35 } 36 logger.debug("consumeMessage() -- END"); 37 }
