史上最差的kafka教程第五天(kafka避免重復消費與避免異常處理死循環)


kafka避免重復消費實際上是修改offset的提交方式。但是如果前一段消費失敗通常是采取繼續發送到該topic下,但是調用的服務已經掛了,如果

服務一直處於異常意味着,要不停的重復失敗的數據。會變成死循環,不停報錯,浪費系統資源。

解決方案:將處理失敗的信息發送到指定topic底下,該topic監聽器采用定時器開關,每天定時去讀取操作失敗的數據。

具體操作:

  1. 設置監聽器啟動方式或者不設置
  2. 設置需要設置定時開關監聽器的id
  3. 利用定時器定時開關指定監聽器
@Component
public class TaskListener{

    private static final Logger log= LoggerFactory.getLogger(TaskListener.class);

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private ConsumerFactory consumerFactory;
//如果在kafkaconfiguration里面有設置過的話,不需要再設置 @Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(consumerFactory); //禁止自動啟動,這個是全局監聽器的。可以根據具體業務設置 container.setAutoStartup(false); return container; } @KafkaListener(id = "durable", topics = "topic.durable",containerFactory = "delayContainerFactory") public void durableListener(String data) { //這里做數據持久化的操作 log.info("topic.quick.durable receive : " + data); }
}

 

 

@Component
@EnableScheduling
@Slf4j
public class TaskDemo {

    @Autowired
    private KafkaListenerEndpointRegistry registry;


    @Scheduled(cron = "0/30 * * * * ?")
    public void startRun(){
        //判斷監聽容器是否啟動,未啟動則將其啟動
        if (!registry.getListenerContainer("durable").isRunning()) {
            log.info("開啟監聽器");
            registry.getListenerContainer("durable").start();
        }
    }


    @Scheduled(cron = "0/10 * * * * ?")
    public void stopRun(){
        //判斷監聽容器是否啟動,未啟動則將其啟動
        if (registry.getListenerContainer("durable").isRunning()) {
            log.info("關閉監聽器");
            registry.getListenerContainer("durable").stop();
        }
      
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM