RocketMQ 消息使用詳解


MQ

MQ 架構

image

Message 包含內容

image

RocketMQ 客戶端坐標

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

消息發送&消費模式

注意:當生產者出現找不到 topic 的報錯信息時,要檢查是否服務器的防火牆策略導致的。

One-To-One(單生產者單消費者)

生產者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1.創建一個發送消息的對象Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");  // 入參為自定義組名
        // 2.設定發送的命名服務器地址
        producer.setNamesrvAddr("192.168.3.244:9876");
        // 設置發送消息超時時間(默認為3000)
        producer.setSendMsgTimeout(60000);
        // 3.啟動發送的服務
        producer.start();
        // 4.創建要發送的消息對象,指定topic和body
        Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
        // 5.發送單條消息
        SendResult result = producer.send(msg);
        // 打印返回消息
        System.out.println("返回結果:"+result);
        // 6.關閉連接
        producer.shutdown();
    }
}

運行結果

返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E658458644D464D61A1F80000, offsetMsgId=C0A803DE00002A9F0000000000000000, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=0]

消費者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1.創建一個接收消息的對象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");  // 入參為自定義組名
        // 2.設定接收的命名服務器地址
        consumer.setNamesrvAddr("192.168.3.244:9876");
        // 3.設置接收消息對應的topic,對應的sub標簽為任意*
        consumer.subscribe("topic1", "*");
        // 4.開啟監聽,用於接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 遍歷消息
                for(MessageExt msg : list){
                    // System.out.println("收到消息:"+msg);
                    System.out.println("消息:" + new String(msg.getBody()));
                }
                // 成功處理后,mq 收到該標記,則相同的消息將不會再次發給消費者
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.啟動接收消息的服務
        consumer.start();  // 開啟多線程監控消息,會持續運行
        System.out.println("接收消息服務已開啟運行");
    }
}

運行結果

接收消息服務已開啟運行
消息:hello rocketmq

One-To-Many(單生產者多消費者)

生產者

package onetoone;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1.創建一個發送消息的對象Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");  // 入參為自定義組名
        // 2.設定發送的命名服務器地址
        producer.setNamesrvAddr("192.168.3.222:9876");
        // 設置發送消息超時時間(默認為3000)
        producer.setSendMsgTimeout(60000);
        // 3.啟動發送的服務
        producer.start();
        // 創建要發送的消息對象,指定topic和body
        // Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
        // 發送單條消息
        // SendResult result = producer.send(msg);
        // 4. 發送多條消息
        for (int i = 1; i <= 10; i++) {
            Message msg = new Message("topic1", ("生產者:hello rocketmq "+i).getBytes("UTF-8"));
            SendResult result = producer.send(msg);
            System.out.println("返回結果:" + result);
        }
        // 5.關閉連接
        producer.shutdown();
    }
}

運行結果

返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EF300000, offsetMsgId=C0A803DE00002A9F0000000000000E79, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=4]
返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EF7A0001, offsetMsgId=C0A803DE00002A9F0000000000000F2A, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=6]
返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EF980002, offsetMsgId=C0A803DE00002A9F0000000000000FDB, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=0], queueOffset=6]
返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFAB0003, offsetMsgId=C0A803DE00002A9F000000000000108C, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=1], queueOffset=5]
返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFB70004, offsetMsgId=C0A803DE00002A9F000000000000113D, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=5]
返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFCC0005, offsetMsgId=C0A803DE00002A9F00000000000011EE, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=7]
返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFDF0006, offsetMsgId=C0A803DE00002A9F000000000000129F, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=0], queueOffset=7]
返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFEB0007, offsetMsgId=C0A803DE00002A9F0000000000001350, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=1], queueOffset=6]
返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFF20008, offsetMsgId=C0A803DE00002A9F0000000000001401, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=6]
返回結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFFD0009, offsetMsgId=C0A803DE00002A9F00000000000014B2, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=8]

消費者

負載均衡模式

負載均衡模式:即生產的多個消息(數量)會均衡分配給各個消費者。

package onetoone;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1.創建一個接收消息的對象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");  // 入參為自定義組名
        // 2.設定接收的命名服務器地址
        consumer.setNamesrvAddr("192.168.3.222:9876");
        // 3.設置接收消息對應的topic,對應的sub標簽為任意*
        consumer.subscribe("topic1", "*");

        // 設置當前消費者的消費模式:負載均衡(也是默認模式)
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 4.開啟監聽,用於接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 遍歷消息
                for(MessageExt msg : list){
                    // System.out.println("收到消息:"+msg);
                    System.out.println("消息:" + new String(msg.getBody()));
                }
                // 成功處理后,mq 收到該標記,則相同的消息將不會再次發給消費者
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.啟動接收消息的服務
        consumer.start();  // 開啟多線程監控消息,會持續運行
        System.out.println("接收消息服務已開啟運行");
    }
}

運行結果

image

廣播模式

廣播模式:即生產出來的同一條消息,會被每個消費者所接收。

廣播模式的現象

  1. 如果先生產,后消費,則消息只能被消費一次。
  2. 如果多個消費者先啟動(廣播模式),后生產,此時才有廣播的效果。
  3. 結論:必須先啟動消費者,再啟動生產者,才有廣播的效果。
package onetoone;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1.創建一個接收消息的對象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");  // 入參為自定義組名
        // 2.設定接收的命名服務器地址
        consumer.setNamesrvAddr("192.168.3.222:9876");
        // 3.設置接收消息對應的topic,對應的sub標簽為任意*
        consumer.subscribe("topic1", "*");

        // 設置當前消費者的消費模式為廣播模式:所有客戶端接收的消息都是一樣的
         consumer.setMessageModel(MessageModel.BROADCASTING);

        // 4.開啟監聽,用於接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 遍歷消息
                for(MessageExt msg : list){
                    // System.out.println("收到消息:"+msg);
                    System.out.println("消息:" + new String(msg.getBody()));
                }
                // 成功處理后,mq 收到該標記,則相同的消息將不會再次發給消費者
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.啟動接收消息的服務
        consumer.start();  // 開啟多線程監控消息,會持續運行
        System.out.println("接收消息服務已開啟運行");
    }
}

運行結果

image


Many-To-Many(多生產者多消費者)

多生產者產生的消息可以被同一個消費者消費,也可以被多個消費者消費。

(演示:將上述示例的生產者啟動多個實例即可)。


消息類別

同步消息

特征:即時性較強,通常是重要的且必須有回執的消息,例如短信、通知(轉賬成功)。

image

代碼實現

SendResult result = producer.send(msg);

異步消息

特征:即時性較弱,但需要有回執的消息,例如訂單中的某些信息。

image

代碼實現

// 注意:回調處理結果必須在生產者進程結束前執行,否則回調無法正確執行
producer.send(msg, new SendCallback() {
    // 表示成功返回結果
    public void onSuccess(SendResult sendResult) {
        System.out.println(sendResult);
        }
    // 表示發送消息失敗
    public void onException(Throwable t) {
        System.out.println(t);
    }
});
// 注意后面不要馬上shutdown(關閉)連接,確保留有時間接收到異步返回的消息

單向消息

特征:不需要有回執的消息,例如日志類的消息。

image

代碼實現

producer.sendOneway(msg);

延時消息

延時消息:指消息發送時並不直接發送到消息服務器,而是根據設定的等待時間才到達,起到延時到達的緩沖作用。

Message msg = new Message("topic3", ("延時消息:hello rocketmq "+i).getBytes("UTF-8"));
// 設置當前消息的延時效果
msg.setDelayTimeLevel(3);  // 入參表示下標3,代表30s(參考下表)
SendResult result = producer.send(msg);
System.out.println("返回結果:"+result);

目前支持的消息時間:

  • 秒級:1、5、10、30
  • 分級:1~10、20、30
  • 時級:1、2
  • 入參下標含義:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h

批量消息

// 創建消息集合創建要發送的消息對象,指定topic和body
ArrayList<Message> messageList = new ArrayList<>();
Message msg1 = new Message("topic1", "hello rocketmq 1".getBytes("UTF-8"));
Message msg2 = new Message("topic1", "hello rocketmq 2".getBytes("UTF-8"));
Message msg3 = new Message("topic1", "hello rocketmq 3".getBytes("UTF-8"));
messageList.add(msg1);
messageList.add(msg2);
messageList.add(msg3);
// (一次)發送批量消息
SendResult result = producer.send(messageList);
System.out.println("發送結果:"+result);

運行結果

發送結果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E681458644D4650530BBF0000,A9FE135E681458644D4650530BBF0001,A9FE135E681458644D4650530BBF0002, offsetMsgId=C0A803DE00002A9F0000000000001E3E,C0A803DE00002A9F0000000000001EE3,C0A803DE00002A9F0000000000001F88, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=13]

消息過濾

分類過濾

image

  • 生產者:發送時指定 tag
Message msg = new Message("topic6", "tag2", ("消息過濾按照tag:hello rocketmq 2").getBytes("UTF-8"));
  • 消費者:消費指定 tag 的消息
// 接收消息時,除了可以指定topic,還可以指定接收的tag(*代表任意tag,||代表或)
consumer.subscribe("topic6", "tag1 || tag2");

屬性過濾(SQL 過濾)

生產者

// 為消息添加屬性
msg.putUserProperty("vip", "1");
msg.putUserProperty("age", "20");

消費者

// 使用消息選擇器來過濾對應的屬性,語法格式為類SQL語法
consumer.subscribe("topic1", MessageSelector.bySql("age >= 18"));

配置

  • 注意:SQL 過濾需要依賴服務器的功能支持,在 broker.conf 配置文件中添加對應的功能項,並開啟對應功能。
enablePropertyFilter=true
  • 啟動服務器使啟用對應配置文件:
sh mqbroker -n localhost:9876 -c ../conf/broker.conf

消息順序

消息亂序

  • 默認情況下,MQ 開啟了多個隊列, 同時發送多個消息的的話,發送給哪個隊列是不確定的。同時消費者讀取消息,每讀取一個消息開啟一個線程,也不能保證消息的順序性。
    image

  • 想要保證消息的有序性,需要指定消息發送時的隊列。同時消費者應該一個隊列開啟一個線程進行接收,而不是一個消息一個線程。
    image


順序消息

生產者

// 設置消息進入到指定的消息隊列中
for(final Order order : orderList){
    Message msg = new Message("orderTopic", order.toString().getBytes());
    // 發送時要指定對應的消息隊列選擇器
    SendResult result = producer.send(msg, new MessageQueueSelector() {
        // 設置當前消息發送時使用哪一個消息隊列
        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
            // 根據發送的信息不同,選擇不同的消息隊列
            // 根據id來選擇一個消息隊列的對象(用id的哈希值來取模)
            int mqIndex = order.getId().hashCode() % list.size();
            return list.get(mqIndex);
        }
    }, null);
    System.out.println(result);
}

消費者

// 使用單線程的模式從消息隊列中取數據,一個線程綁定一個消息隊列
consumer.registerMessageListener(new MessageListenerOrderly() {
    // 使用MessageListenerOrderly接口后,
    // 對消息隊列的處理由一個消息隊列多個線程服務轉化為一個消息隊列一個線程服務
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        for(MessageExt msg : list){
            System.out.println("消息:"+new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

事務消息

事務消息過程

image

事務消息狀態

  • 提交狀態:允許進入隊列,此消息與非事務消息無區別。

  • 回滾狀態:不允許進入隊列,此消息等同於未發送過。

  • 中間狀態:完成了half消息的發送,未對 MQ 進行二次狀態確認。

注意:事務消息僅與生產者有關,與消費者無關。

事務消息實現

// 事務消息使用的生產者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.184.128:9876");
// 添加本地事務對應的監聽
producer.setTransactionListener(new TransactionListener() {
    // 正常事務過程
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 提交狀態
        return LocalTransactionState.COMMIT_MESSAGE;
        // 回滾狀態
        return LocalTransactionState.ROLLBACK_MESSAGE;
        // 中間狀態
        return LocalTransactionState.UNKNOW;

    }
    // 事務補償過程
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 事務補償過程必須保證服務器正在運行,否則將無法進行正常的事務補償
        return LocalTransactionState.COMMIT_MESSAGE;
        // return null; 
    }
});
producer.start();
Message msg = new Message("topic8", ("事務消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回結果:"+result);
producer.shutdown();


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM