Kafka回調函數


Controller

    /**
     * 回調方法中監控消息是否發送成功 或 失敗時做補償處理
     */
    @GetMapping("/callback/{message}")
    public String sendMessageCallback(@PathVariable("message") String callbackMessage) {
        kafkaProducer.sendMessageCallback(callbackMessage);
        return "niu bi a";
    }

Service :kafka producer

    /**
     * callback
     */
    public void sendMessageCallback(String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("發送消息失敗:" + ex.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("發送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }
}

消費者:

@Service
public class KafkaConsumer3 {
    @KafkaListener(topics = "topic1", groupId = KafkaConstants.KAFKA_GROUP_ID_THREE)
    public void receiveTopicMessage(String message) {
        System.out.println("KafkaConsumer3 ---> receiveTopicMessage:接收消息,內容為:" + message);
    }
}

結果:
image

=====

image

Kafka問題1:
win端查看topic內容,中文亂碼:
輸入:CHCP 65001

原因:kafka默認編碼是utf-8,但win10是gbk,所以中文亂碼,需在cmd下設置本機局部編碼格式位utf-8
image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM