Spring-Kafka —— KafkaListener定時啟動和停止


 

一、定時啟動的應用場景

比如現在單機環境下,我們需要利用Kafka做數據持久化的功能,由於用戶活躍的時間為早上10點至晚上12點,那在這個時間段做一個大數據量的持久化可能會影響數據庫性能導致用戶體驗降低,我們可以選擇在用戶活躍度低的時間段去做持久化的操作,也就是晚上12點后到第二條的早上10點前。
 

二、實現思路

  1. 禁止KafkaListener自啟動(AutoStartup)
  2. 編寫兩個定時任務,一個晚上12點,一個早上10點
  3. 分別在12點的任務上啟動KafkaListener,在10點的任務上關閉KafkaListener

這里需要注意一下啟動監聽容器的方法,項目啟動的時候監聽容器是未啟動狀態,而resume是恢復的意思不是啟動的意思,所以我們需要判斷容器是否運行,如果運行則調用resume方法,否則調用start方法

 

三、實現代碼

@Component
@EnableScheduling
public class TaskListener{

    private static final Logger log= LoggerFactory.getLogger(TaskListener.class);

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private ConsumerFactory consumerFactory;

    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止自動啟動
        container.setAutoStartup(false);
        return container;
    }

    @KafkaListener(id = "durable", topics = "topic.quick.durable",containerFactory = "delayContainerFactory")
    public void durableListener(String data) {
        //這里做數據持久化的操作
        log.info("topic.quick.durable receive : " + data);
    }


    //定時器,每天凌晨0點開啟監聽
    @Scheduled(cron = "0 0 0 * * ?")
    public void startListener() {
        log.info("開啟監聽");
        //判斷監聽容器是否啟動,未啟動則將其啟動
        if (!registry.getListenerContainer("durable").isRunning()) {
            registry.getListenerContainer("durable").start();
        }
        registry.getListenerContainer("durable").resume();
    }

    //定時器,每天早上10點關閉監聽
    @Scheduled(cron = "0 0 10 * * ?")
    public void shutDownListener() {
        log.info("關閉監聽");
        registry.getListenerContainer("durable").pause();
    }



}

 

 

參考:

  https://www.jianshu.com/p/2447592ca5a9


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM