RocketMQ之Producer


一、Producer 介紹

1.1 消息發送的步驟

1) 設置 Producer 的 GroupName(Producer Group是一類 Producer 的集合);
2) 設置 InstanceName,當一個 JVM 需要啟動多個 Producer 時,需要指定不同的 InstanceName 來區分,不顯式設置時使用默認名稱 "DEFAULT";
3) 設置發送失敗重試次數,默認值是2次,可能會出現重復消息,因此需要消費端進行控制;
4) 設置 NameServer 地址;
5) 組裝數據並發送    

1.2 生產者核心參數

* producerGroup:生產者組名
* createTopicKey:創建 Topic,生產環境一般不直接從代碼層面創建而是在控制台創建    
* defaultTopicQueueNums:每個 Topic 下的隊列數量,默認數量是4
* sendMsgTimeout:消息發送超時時間,單位ms
* compressMsgBodyOverHowmuch:當消息大小超過指定字節就會開啟壓縮,默認字節為4096
* retryTimesWhenSendFailed:同步模式下,消息發送失敗重試次數,默認2次
* retryTimesWhenSendAsyncFailed:異步模式下,消息發送失敗重試次數,默認2次
* retryAnotherBrokerWhenNotStoreOK:當 broker 接收失敗時,是否切換另一個 broker ,默認為 false 
* maxMessageSize:最大的消息容量限制,默認是4M    

二、不同類型的生產者

生產者向消息隊列中寫入消息,根據不同的業務場景需要采用不同的寫入策略,如同步發送、異步發送、延遲發送和發送事務消息等。

2.1 同步發送

public class Producer {
    
    public static void main(String[] args) throws MQClientException, RemotingException,
            InterruptedException, MQBrokerException {
        // 創建生產者對象
        DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
        // 設置實例化名稱
        producer.setInstanceName("SyncProducer");
        // 指定同步模式下,失敗重試次數
        producer.setRetryTimesWhenSendFailed(5);                
        // 設置服務器地址
        producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 啟動實例
        producer.start();
        // 實例化消息對象
        Message message = new Message("topicTest", "tagA", "同步消息發送".getBytes());
        // 同步發送消息
        SendResult sendResult = producer.send(message);
        System.out.printf("%s%n", sendResult);
        // 關閉生產者
        producer.shutdown();
    }
}

2.2 異步發送

public class Producer {

    public static void main(String[] args) throws MQClientException, RemotingException,
            InterruptedException, MQBrokerException {
        // 創建生產者對象
        DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
        // 設置實例化名稱
        producer.setInstanceName("AsyncProducer");
        // 指定異步模式下,失敗重試次數
        producer.setRetryTimesWhenSendAsyncFailed(5);
        // 設置服務器地址
        producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 啟動實例
        producer.start();
        // 實例化消息對象
        Message message = new Message("topicTest", "tagA", "異步消息發送".getBytes());
        // 異步發送消息
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("%s%n", sendResult);
                // 關閉生產者
                producer.shutdown();
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
    }
}    

2.3 延遲發送

RocketMQ 支持發送延遲消息,Broker 在收到這類消息后,會延遲一段時間再處理,使消息在規定的一段時間后生效。

延遲消息的使用方法是在創建 Message 對象時,調用 setDelayTimeLevel(int level) 方法設置延遲時間。目前不支持自定義時間,只能使用預定義的時間長度,如 setDelayTimeLevel(3) 表示延遲10s。

public class Producer {

    public static void main(String[] args) throws MQClientException, RemotingException,
            InterruptedException, MQBrokerException {
        // 創建生產者對象
        DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
        // 設置實例化名稱
        producer.setInstanceName("SyncProducer");
        // 指定同步模式下,失敗重試次數
        producer.setRetryTimesWhenSendFailed(5);
        // 設置服務器地址
        producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 啟動實例
        producer.start();
        // 實例化消息對象
        Message message = new Message("topicTest", "tagA", "延遲消息發送".getBytes());
        // 設置延遲時間,時間長度為(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)
        message.setDelayTimeLevel(3);
        // 發送消息
        SendResult sendResult = producer.send(message);
        System.out.printf("%s%n", sendResult);
        // 關閉實例
        producer.shutdown();
    }
}    

2.4 自定義發送規則

一個 Topic 會有多個 Message QueueProducer 的默認配置會輪流向各個 Message Queue 發送消息。Consumer 在消費消息時,會根據負載均衡策略,消費被分配到的 Message Queue。如果要把消息發送到指定的 Message Queue,可以使用 Message-QueueSelector

public class MyMessageQueueSelector implements MessageQueueSelector {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object key) {\
        // 自定義選擇 Message Queue 規則
        int id = Integer.parseInt(key.toString());
        int idMainIndex = id / 100;
        int size = mqs.size();
        int index = idMainIndex % size;
        return mqs.get(index);
    }
}

在發送消息的時候,將 MessageQueueSelector 對象作為參數即可。

public class Producer {

    public static void main(String[] args) throws MQClientException, RemotingException,
            InterruptedException, MQBrokerException {
        // 創建生產者對象
        DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
        // 設置實例化名稱
        producer.setInstanceName("SyncProducer");
        // 指定同步模式下,失敗重試次數
        producer.setRetryTimesWhenSendFailed(5);
        // 設置服務器地址
        producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 啟動實例
        producer.start();
        // 實例化消息對象
        Message message = new Message("topicTest", "tagA", "自定義消息發送".getBytes());
        // 發送消息
        SendResult sendResult = producer.send(message,new MyMessageQueueSelector(),1000);
        System.out.printf("%s%n", sendResult);
        // 關閉實例
        producer.shutdown();
    }
}    

2.5 事務消息

當某幾件事需要同時成功或失敗的時候,就需要使用到事務功能,如銀行轉賬:A 銀行的某賬戶要轉一萬元到 B 銀行的某賬戶:

1) 從 A 賬戶扣除一萬元
2) 對 B 賬戶增加一萬元    

這兩個操作需要同時成功或同時失敗, RocketMQ 采用兩階段提交的方式實現事務消息,TransactionMqRroducer 處理流程如下:

1) 發送方向 RocketMQ 發送 "待確認" 消息;
2) RocketMQ 將收到的 "待確認" 消息持久化成功后,向發送方回復消息已經發送成功,此時第一階段消息發送完成;
3) 發送發開始執行本地事件邏輯
4) 發送方根據本地事件執行結果向 RocketMQ 發送二次確認(Commit 或 Rollback) 消息:
    * 接收到 commit 消息,將把第一階段消息標記為可投遞,訂閱方將會收到該消息;
    * 接收到 rollback 消息,將刪除第一階段消息,訂閱方不會接受到該消息;
5) 如果出現異常情況,步驟4 提交的二次確認最終未到達 RocketMQ ,服務器將經過固定時間段后將對 "待確認" 消息發起回查請求;
6) 發送方收到消息回傳請求后(如果第一階段發送的 Producer 不能工作時,將會回傳給同一個 ProducerGroup 的其他 Producer),通過對檢查對應消息的本地事件執行結果返回 Commit 或 Rollback 狀態;
7) RocketMQ 收到回查請求后,按照步驟4) 流程繼續處理    

RocketMQ 通過以下幾個類來支持用戶實現事務消息:

  • TransactionMQProducer

    和 DefaultMQProducer 用戶類似,通過它啟動事務消息,相比 DefaultMQProducer 需要多設置本地事務處理函數和回查狀態函數
    
  • TransactionListener

    提供本地執行方法和回查方法,返回 LocalTransactionState 狀態標識:
        * LocalTransactionState.COMMIT_MESSAGE:提交
        * LocalTransactionState.ROLLBACK_MESSAGE:回滾
        * LocalTransactionState.UNKNOW:未知,需要回查
    

實現 TransactionListener 接口

public class MyTransactionListener implements TransactionListener {

    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private AtomicInteger checkTimes = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    // 執行本地事務
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String msgKey = msg.getKeys();
        System.out.println("start execute local transaction " + msgKey);
        LocalTransactionState state;
        if (msgKey.contains("1")) {
            // 第一條消息讓他通過
            state = LocalTransactionState.COMMIT_MESSAGE;
        } else if (msgKey.contains("2")) {
            // 第二條消息模擬異常,明確回復回滾操作
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            // 第三條消息無響應,讓它調用回查事務方法
            state = LocalTransactionState.UNKNOW;
            // 給剩下3條消息,放1,2,3三種狀態
            localTrans.put(msgKey, transactionIndex.incrementAndGet());
        }
        System.out.println("executeLocalTransaction:" + msg.getKeys() + ",execute state:" + state + ",current time:" + System.currentTimeMillis());
        return state;
    }

    // 檢查本地事務結果
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String msgKey = msg.getKeys();
        System.out.println("start check local transaction " + msgKey);
        Integer state = localTrans.get(msgKey);
        switch (state) {
            case 1:
                System.out.println("check result unknown 回查次數" + checkTimes.incrementAndGet());
                return LocalTransactionState.UNKNOW;
            case 2:
                System.out.println("check result commit message, 回查次數" + checkTimes.incrementAndGet());
                return LocalTransactionState.COMMIT_MESSAGE;
            case 3:
                System.out.println("check result rollback message, 回查次數" + checkTimes.incrementAndGet());
                return LocalTransactionState.ROLLBACK_MESSAGE;
            default:
                return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
}

實現事務消息生產者

public class Producer {

    public static void main(String[] args) throws MQClientException, RemotingException,
            InterruptedException, MQBrokerException {
        // 創建事務生產者對象
        TransactionMQProducer producer = new TransactionMQProducer("producerGroupName");
        // 設置實例化名稱
        producer.setInstanceName("SyncProducer");
        // 指定同步模式下,失敗重試次數
        producer.setRetryTimesWhenSendFailed(5);
        // 設置事務監聽器
        producer.setTransactionListener(new MyTransactionListener());
        // 設置服務器地址
        producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);

        // 啟動實例
        producer.start();
        for (int i = 0; i < 5; i++) {
            // 實例化消息對象
            Message message = new Message("topicTest", "tagA","msg-" + i, ("事務消息發送" + ":" +  i).getBytes());
            // 發送消息
            SendResult sendResult = producer.sendMessageInTransaction(message, i);
            System.out.printf("%s%n", sendResult);
        }

        // 關閉實例
        // producer.shutdown();
    }

}

三、消息返回狀態

3.1 獲取消息發送返回狀態對象

// 獲取發送消息結果
SendResult sendResult = producer.send(message);
// 從結果中獲取狀態對象
SendStatus sendStatus = sendResult.getSendStatus();

3.2 狀態值

public enum SendStatus {
    // 表示發送成功
    SEND_OK,
    // 表示沒有在指定時間內完成刷盤(需要 Broker 的刷盤策略被設置成SYNC_FLUSH)
    FLUSH_DISK_TIMEOUT,
    // 表示在主備模式下,並且 Broker 被設置成SYNC_MASTER,沒有在指定時間內完成主從同步
    FLUSH_SLAVE_TIMEOUT,
    // 表示在主備模式下,並且 Broker 被設置成SYNC_MASTER,沒有找到被配置成 Slave 的 Broker
    SLAVE_NOT_AVAILABLE,
}

注:當返回狀態不是 SEND_OK 時,都需要有補償機制。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM