RocketMQ(6)---發送普通消息(三種方式)


發送普通消息(三種方式)

RocketMQ 發送普通消息有三種實現方式:可靠同步發送可靠異步發送單向(Oneway)發送

注意順序消息只支持可靠同步發送

GitHub地址: https://github.com/yudiandemingzi/SpringBootBlog

一、概念

1、可靠同步發送

原理:同步發送是指消息發送方發出數據后,會在收到接收方發回響應之后才發下一個數據包的通訊方式。

應用場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統等。

2、可靠異步發送

原理:異步發送是指發送方發出數據后,不等接收方發回響應,接着發送下個數據包的通訊方式。 消息隊列 RocketMQ 的異步發送,需要用戶實現異步發送回調接口(SendCallback)。

應用場景:異步發送一般用於鏈路耗時較長,對 RT 響應時間較為敏感的業務場景,例如批量發貨等操作。

3、單向(Oneway)發送

原理:單向(Oneway)發送特點為發送方只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答。 此方式發送消息的過程耗時非常短,一般在微秒級別。

應用場景:適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日志收集。

4、三種對比

下表概括了三者的特點和主要區別。

發送方式 發送 TPS 發送結果反饋 可靠性
同步發送 不丟失
異步發送 不丟失
單向發送 最快 可能丟失

二、代碼示例

1、三種方式代碼示例

@Slf4j
@RestController
public class Controller {
    /**
     * 生產者組
     */
    private static String PRODUCE_RGROUP = "test_producer";
    /**
     * 創建生產者對象
     */
    private static DefaultMQProducer producer = null;

    static {
        producer = new DefaultMQProducer(PRODUCE_RGROUP);
        //不開啟vip通道 開通口端口會減2
        producer.setVipChannelEnabled(false);
        //綁定name server
        producer.setNamesrvAddr("47.99.03.25:9876");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }

    @GetMapping("/message")
    public  void  message() throws Exception {
        //1、同步
        sync();
        //2、異步
        async();
        //3、單項發送
        oneWay();
    }
    /**
     * 1、同步發送消息
     */
    private  void sync() throws Exception {
        //創建消息
        Message message = new Message("topic_family", ("  同步發送  ").getBytes());
        //同步發送消息
        SendResult sendResult = producer.send(message);
        log.info("Product-同步發送-Product信息={}", sendResult);
    }
    /**
     * 2、異步發送消息
     */
    private  void async() throws Exception {
        //創建消息
        Message message = new Message("topic_family", ("  異步發送  ").getBytes());
        //異步發送消息
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Product-異步發送-輸出信息={}", sendResult);
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                //補償機制,根據業務情況進行使用,看是否進行重試
            }
        });
    }
    /**
     * 3、單項發送消息
     */
    private  void oneWay() throws Exception {
        //創建消息
        Message message = new Message("topic_family", (" 單項發送 ").getBytes());
        //同步發送消息
        producer.sendOneway(message);
    }
}

2、測試結果

這里消費者代碼就不貼出來了。

通過這個很明顯可以看出三種方式都被 Consumer 消費了。只不過對於 Product 同步和異步發送是有返回信息的,單項發送是沒有返回信息的。


三、SendStatus狀態

當Product發送消息的時候,會返回SendResult對象,該對象又包含了一個SendStatus對象。

package org.apache.rocketmq.client.producer;
public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

下面對這幾種狀態進行說明

SEND_OK

代表發送成功!但並不保證它是可靠的。要確保不會丟失任何消息,還應啟用SYNC_MASTER或SYNC_FLUSH。

SLAVE_NOT_AVAILABLE

如果Broker的角色是SYNC_MASTER(同步復制)(默認為異步),但沒有配置Slave Broker,將獲得此狀態。

FLUSH_DISK_TIMEOUT

如果Broker設置為 SYNC_FLUSH(同步刷盤)(默認為ASYNC_FLUSH),並且Broker的syncFlushTimeout(默認為5秒)內完成刷新磁盤,將獲得此狀態。

FLUSH_SLAVE_TIMEOUT

如果Broker的角色是SYNC_MASTER(同步復制)(默認為ASYNC_MASTER),並且從屬Broker的syncFlushTimeout(默認為5秒)內完成與主服務器的同步,將獲得此狀態。


參考

1、RocketMQ 阿里雲官網文檔



只要自己變優秀了,其他的事情才會跟着好起來(上將3)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM