kafka避免重復消費實際上是修改offset的提交方式。但是如果前一段消費失敗通常是采取繼續發送到該topic下,但是調用的服務已經掛了,如果
服務一直處於異常意味着,要不停的重復失敗的數據。會變成死循環,不停報錯,浪費系統資源。
解決方案:將處理失敗的信息發送到指定topic底下,該topic監聽器采用定時器開關,每天定時去讀取操作失敗的數據。
具體操作:
- 設置監聽器啟動方式或者不設置
- 設置需要設置定時開關監聽器的id
- 利用定時器定時開關指定監聽器
@Component public class TaskListener{ private static final Logger log= LoggerFactory.getLogger(TaskListener.class); @Autowired private KafkaListenerEndpointRegistry registry; @Autowired private ConsumerFactory consumerFactory;
//如果在kafkaconfiguration里面有設置過的話,不需要再設置 @Bean public ConcurrentKafkaListenerContainerFactory delayContainerFactory() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(consumerFactory); //禁止自動啟動,這個是全局監聽器的。可以根據具體業務設置 container.setAutoStartup(false); return container; } @KafkaListener(id = "durable", topics = "topic.durable",containerFactory = "delayContainerFactory") public void durableListener(String data) { //這里做數據持久化的操作 log.info("topic.quick.durable receive : " + data); }
}
@Component @EnableScheduling @Slf4j public class TaskDemo { @Autowired private KafkaListenerEndpointRegistry registry; @Scheduled(cron = "0/30 * * * * ?") public void startRun(){ //判斷監聽容器是否啟動,未啟動則將其啟動 if (!registry.getListenerContainer("durable").isRunning()) { log.info("開啟監聽器"); registry.getListenerContainer("durable").start(); } } @Scheduled(cron = "0/10 * * * * ?") public void stopRun(){ //判斷監聽容器是否啟動,未啟動則將其啟動 if (registry.getListenerContainer("durable").isRunning()) { log.info("關閉監聽器"); registry.getListenerContainer("durable").stop(); } }