Controller
/**
* 回調方法中監控消息是否發送成功 或 失敗時做補償處理
*/
@GetMapping("/callback/{message}")
public String sendMessageCallback(@PathVariable("message") String callbackMessage) {
kafkaProducer.sendMessageCallback(callbackMessage);
return "niu bi a";
}
Service :kafka producer
/**
* callback
*/
public void sendMessageCallback(String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("發送消息失敗:" + ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("發送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
}
消費者:
@Service
public class KafkaConsumer3 {
@KafkaListener(topics = "topic1", groupId = KafkaConstants.KAFKA_GROUP_ID_THREE)
public void receiveTopicMessage(String message) {
System.out.println("KafkaConsumer3 ---> receiveTopicMessage:接收消息,內容為:" + message);
}
}
結果:
=====
Kafka問題1:
win端查看topic內容,中文亂碼:
輸入:CHCP 65001
原因:kafka默認編碼是utf-8,但win10是gbk,所以中文亂碼,需在cmd下設置本機局部編碼格式位utf-8