springboot kafka發送消息支持成功失敗通知


springboot集成kafka是比較簡單的是事情,但是kafka發送消息的失敗回調在日常工作中,如果不容忍消息丟失的話,發送失敗需要再次發送或者放到數據庫中用任務重推。
以下是演示用的發送類代碼

@Slf4j
@Component
public class TestRunner implements ApplicationRunner {
    @Autowired
    KafkaTemplate kafkaTemplate;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        KafkaMsgEntity kafkaMsgEntity = new KafkaMsgEntity();
        kafkaMsgEntity.setActionName("login");
        String tmpStr = "id:%d,msg:login";
        for (int i = 1; i < 500; i++) {
            String tmpStr1 = tmpStr.replace("%d", String.valueOf(i));
            Thread.sleep(500);
            kafkaMsgEntity.setMsgBody(tmpStr1);
            kafkaTemplate.send("test", JSON.toJSONString(kafkaMsgEntity)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    if (throwable instanceof KafkaProducerException) {
                        String value = (String) ((KafkaProducerException) throwable).getProducerRecord().value();
                        log.info("{} get throwable msg:{}", value, throwable.getMessage());
                    } else {
                        log.info("get throwable msg:{}", throwable.getMessage());
                    }
                }

                @Override
                public void onSuccess(SendResult<String, String> o) {
                    log.info("{}, success", o.getProducerRecord().value());
                }
            });
        }
    }
}

在kafka運行過程中kill進程達到異常發送的條件。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM