kafka消息保留策略設置


項目遇到一個問題,kafka服務器被下電了4天,消息保留策略設置的24小時。服務器上電后,消息過期,消費者拿不到消息。模擬這種場景,復現下

log.retention.minutes=1

設置消息保留時間為1分鍾

log.retention.hours

也可以設置多個小時,默認是168個小時,7天。

log.retention.check.interval.ms=10000

同時設置檢查過期消息間隔為10秒,為了測試

    public void sendMsg() {
        int messageNo = 1;
        try {
            for(;;) {
                String messageStr="你好,這是第"+messageNo+"條數據";
                producer.send(new ProducerRecord<String, String>(topic,  messageStr));
                System.out.println("發送的信息:" + messageStr);
                Thread.sleep(1000);
                messageNo++;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }

通過java生產者想kafka發送消息

過兩分鍾后,消費者消費

    public void receiveMsg() {
        int messageNo = 1;
        System.out.println("---------開始消費---------");
        try {
            for (;;) {
                msgList = consumer.poll(1000);
                if(null!=msgList&&msgList.count()>0){
                    for (ConsumerRecord<String, String> record : msgList) {
                        System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                    }
                }else{
                    Thread.sleep(1000);
                }
                consumer.commitSync();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

可以看到消費者獲取不到消息,應為消息已經過期


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM