項目遇到一個問題,kafka服務器被下電了4天,消息保留策略設置的24小時。服務器上電后,消息過期,消費者拿不到消息。模擬這種場景,復現下
log.retention.minutes=1
設置消息保留時間為1分鍾
log.retention.hours
也可以設置多個小時,默認是168個小時,7天。
log.retention.check.interval.ms=10000
同時設置檢查過期消息間隔為10秒,為了測試
public void sendMsg() { int messageNo = 1; try { for(;;) { String messageStr="你好,這是第"+messageNo+"條數據"; producer.send(new ProducerRecord<String, String>(topic, messageStr)); System.out.println("發送的信息:" + messageStr); Thread.sleep(1000); messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } }
通過java生產者想kafka發送消息
過兩分鍾后,消費者消費
public void receiveMsg() { int messageNo = 1; System.out.println("---------開始消費---------"); try { for (;;) { msgList = consumer.poll(1000); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord<String, String> record : msgList) { System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); } }else{ Thread.sleep(1000); } consumer.commitSync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } }
可以看到消費者獲取不到消息,應為消息已經過期