手把手帶你了解消息中間件(3)——RocketMQ


一、RocketMQ簡介

  RocketMQ作為一款純java、分布式、隊列模型的開源消息中間件,支持事務消息、順序消息、批量消息、定時消息、消息回溯等。

二、RocketMQ架構


  如圖所示為RocketMQ基本的部署結構,主要分為NameServer集群、Broker集群、Producer集群和Consumer集群四個部分。

  Broker在啟動的時候會去向NameServer注冊並且定時發送心跳,Producer在啟動的時候會到NameServer上去拉取Topic所屬的Broker具體地址,然后向具體的Broker發送消息

1、NameServer

  NameServer的作用是Broker的注冊中心。

  每個NameServer節點互相之間是獨立的,沒有任何信息交互,也就不存在任何的選主或者主從切換之類的問題,因此NameServer是很輕量級的。單個NameServer節點中存儲了活躍的Broker列表(包括master和slave),這里活躍的定義是與NameServer保持有心跳。

2、Topic、Tag、Queue、GroupName

  Topic 與 Tag 都是業務上用來歸類的標識,區分在於 Topic 是一級分類,而 Tag 可以理解為是二級分類

1) Topic(話題)

  Topic是生產者在發送消息和消費者在拉取消息的類別。Topic與生產者和消費者之間的關系非常松散。一個生產者可以發送不同類型Topic的消息。消費者組可以訂閱一個或多個主題,只要該組的實例保持其訂閱一致即可。

  我們可以理解為Topic是第一級消息類型,比如一個電商系統的消息可以分為:交易消息、物流消息等,一條消息必須有一個Topic。

2) Tag(標簽)

  意思就是子主題,為用戶提供了額外的靈活性。有了標簽,方便RocketMQ提供的查詢功能。

  可以理解為第二級消息類型,交易創建消息,交易完成消息..... 一條消息可以沒有Tag

3) Queue(隊列)

  一個topic下,可以設置多個queue(消息隊列),默認4個隊列。當我們發送消息時,需要要指定該消息的topic。

  RocketMQ會輪詢該topic下的所有隊列,將消息發送出去。

  在 RocketMQ 中,所有消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指隊列中的每個存儲單元都是定長,訪問其中的存儲單元使用 Offset 來訪問,offset 為 java long 類型,64 位,理論上在 100年內不會溢出,所以認為是長度無限。

  也可以認為 Message Queue 是一個長度無限的數組,Offset 就是下標。

4) groupName(組名稱)

  RocketMQ中也有組的概念。代表具有相同角色的生產者組合或消費者組合,稱為生產者組或消費者組。

  作用是在集群HA的情況下,一個生產者down之后,本地事務回滾后,可以繼續聯系該組下的另外一個生產者實例,不至於導致業務走不下去。在消費者組中,可以實現消息消費的負載均衡和消息容錯目標。

  有了GroupName,在集群下,動態擴展容量很方便。只需要在新加的機器中,配置相同的GroupName。啟動后,就立即能加入到所在的群組中,參與消息生產或消費。

3、Broker-存放消息

  Broker是具體提供業務的服務器,單個Broker節點與所有的NameServer節點保持長連接及心跳,定時(每隔30s)注冊Topic信息到所有Name Server。Name Server定時(每隔10s)掃描所有存活broker的連接,如果Name Server超過2分鍾沒有收到心跳,則Name Server斷開與Broker的連接。底層的通信和連接都是基於Netty實現的。

  負載均衡:Broker上存Topic信息,Topic由多個隊列組成,隊列會平均分散在多個Broker上,會自動輪詢當前所有可發送的broker ,盡量平均分布到所有隊列中,最終效果就是所有消息都平均落在每個Broker上。

  高可用:Broker中分masterslave兩種角色,每個master可以對應多個slave,但一個slave只能對應一個master,master和slave通過指定相同的Brokername組成,其中不同的BrokerId==0 是master,非0是slave。

  高可靠並發讀寫服務:master和slave之間的同步方式分為同步雙寫和異步復制,異步復制方式master和slave之間雖然會存在少量的延遲,但性能較同步雙寫方式要高出10%左右。

Topic、Broker、queue三者間的關系

4、Producer-生產消息

1) 與nameserver的關系

  單個Producer和一台NameServer節點(隨機選擇)保持長連接,定時查詢topic配置信息,如果該NameServer掛掉,生產者會自動連接下一個NameServer,直到有可用連接為止,並能自動重連。與NameServer之間沒有心跳。

2) 與broker的關系

  單個Producer和與其關聯的所有broker保持長連接,並維持心跳。默認情況下消息發送采用輪詢方式,會均勻發到對應Topic的所有queue中。

5、Consumer-消費消息

1) 與nameserver的關系

  單個Consumer和一台NameServer保持長連接,定時查詢topic配置信息,如果該NameServer掛掉,消費者會自動連接下一個NameServer,直到有可用連接為止,並能自動重連。與NameServer之間沒有心跳。

2) 與broker的關系

  單個Consumer和與其關聯的所有broker保持長連接,並維持心跳,失去心跳后,則關閉連接,並向該消費者分組的所有消費者發出通知,分組內消費者重新分配隊列繼續消費。

5.1 消費者類型
  • 1) pull consume
      Consumer 的一種,應用通常通過 Consumer 對象注冊一個 Listener 接口,一旦收到消息,Consumer 對象立刻回調 Listener 接口方法,類似於activemq的方式
  • 2) push consume
      Consumer 的一種,應用通常主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由應用控制
5.2 消費模式
  • 1) 集群模式

  在默認情況下,就是集群消費,此時消息發出去后將只有一個消費者能獲取消息。

  • 2) 廣播模式

  廣播消費,一條消息被多個Consumer消費。消息會發給Consume Group中的所有消費者進行消費。

三、RocketMQ的特性

1、消息順序

  消息的順序指的是消息消費時,能按照發送的順序來消費。

  RocketMQ是通過將“相同ID的消息發送到同一個隊列,而一個隊列的消息只由一個消費者處理“來實現順序消息

2、消息重復

1) 消息重復的原因

  消息領域有一個對消息投遞的QoS(服務質量)定義,分為:最多一次(At most once)、至少一次(At least once)、僅一次( Exactly once)。

  MQ產品都聲稱自己做到了At least once。既然是至少一次,就有可能發生消息重復。

  有很多原因導致,比如:網絡原因閃斷,ACK返回失敗等等故障,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經消費過該消息了,再次將該消息分發給其他的消費者

  不同的消息隊列發送的確認信息形式不同:RocketMQ返回一個CONSUME_SUCCESS成功標志,RabbitMQ是發送一個ACK確認消息

2) 消息去重
  • 1) 去重原則:使用業務端邏輯保持冪等性

  冪等性:就是用戶對於同一操作發起的一次請求或者多次請求的結果是一致的,不會因為多次點擊而產生了副作用,數據庫的結果都是唯一的,不可變的。

  • 2) 只要保持冪等性,不管來多少條重復消息,最后處理的結果都一樣,需要業務端來實現。

  去重策略:保證每條消息都有唯一編號(比如唯一流水號),且保證消息處理成功與去重表的日志同時出現。

四、RocketMQ的應用場景

1、削峰填谷

  比如如秒殺等大型活動時會帶來較高的流量脈沖,如果沒做相應的保護,將導致系統超負荷甚至崩潰。如果因限制太過導致請求大量失敗而影響用戶體驗,可以利用MQ 超高性能的消息處理能力來解決。

2、異步解耦

  通過上、下游業務系統的松耦合設計,比如:交易系統的下游子系統(如積分等)出現不可用甚至宕機,都不會影響到核心交易系統的正常運轉。

3、順序消息

  FIFO原理類似,MQ提供的順序消息即保證消息的先進先出,可以應用於交易系統中的訂單創建、支付、退款等流程。

4、分布式事務消息

  比如阿里的交易系統、支付紅包等場景需要確保數據的最終一致性,需要引入 MQ 的分布式事務,既實現了系統之間的解耦,又可以保證最終的數據一致性。

五、RocketMQ集群部署方式

1、單Mater模式

  優點:配置簡單,方便部署

  缺點:風險較大,一旦Broker重啟或者宕機,會導致整個服務不可用

2、多Master模式

  一個集群無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master

  優點:配置簡單,單個Master宕機重啟對應用沒有影響。消息不會丟失

  缺點:單台機器宕機期間,這台機器上沒有被消費的消息在恢復之前不可訂閱,消息實時性會受到影響。

3、多Master多Slave模式(異步)

  每個Master配置一個Slave,采用異步復制方式,主備有短暫消息延遲

  優點:因為Master 宕機后,消費者仍然可以從 Slave消費,此過程對應用透明。不需要人工干預。性能同多 Master 模式幾乎一樣。

  缺點:Master宕機后,會丟失少量信息

4、多Master多Slave模式(同步)

  每個Master配置一個Slave,采用同步雙寫方式,只有主和備都寫成功,才返回成功

  優點:數據與服務都無單點, Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高

  缺點:性能比異步復制模式略低,大約低 10%左右,發送單個消息的 RT會略高。目前主宕機后,備機不能自動切換為主機,后續會支持自動切換功能

六、RocketMQ的消息類型

消息發送步驟:

消息消費步驟:


  創建一個maven工程,導入依賴

    <dependencies>
        <!--rocket-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
        <dependency>
       <!--順序消息中,模擬了一個消息集合,加入了lombok-->
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
        </dependency>
    </dependencies>

1、普通消息

點擊查看生產者代碼 ```java /** * 普通消息生產者 */ public class Producer {
public static void main(String[] args) throws Exception {

// 創建一個消息發送入口對象,主要用於消息發送,指定生產者組
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
// 設置NameServe地址,如果是集群環境,用分號隔開
producer.setNamesrvAddr("127.0.0.1:9876");
// 啟動並創建消息發送組件
producer.start();
// topic的名字
String topic = "rocketDemo1";
// 標簽名
String taget = "tag";
// 要發送的數據
String body = "hello,RocketMq";
Message message = new Message(topic,taget,body.getBytes());
// 發送消息
SendResult result = producer.send(message);
System.out.println(result);
// 關閉消息發送對象
producer.shutdown();
}
}

</details>
 <details>
<summary>點擊查看消費者代碼</summary>
```java
/**
 * 普通消息消費者
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
//        創建一個消費管理對象,並創建消費者組名字
        DefaultMQPushConsumer consumerGroup = new DefaultMQPushConsumer("ConsumerGroup");
//        設置NameServer地址,如果是集群環境,用逗號分隔
        consumerGroup.setNamesrvAddr("127.0.0.1:9876");
//        設置要讀取的消息主題和標簽
        consumerGroup.subscribe("rocketDemo1", "*");
//      設置回調函數,處理消息
        //注意:MessageListenerConcurrently     -- 並行消費監聽
        consumerGroup.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    //讀取消息記錄
                    for (MessageExt messageExt : msgs) {
                        //獲取消息主題
                        String topic = messageExt.getTopic();
                        //獲取消息標簽
                        String tags = messageExt.getTags();
                        //獲取消息體內容
                        String body = new String(messageExt.getBody(), "UTF-8");
                        System.out.println("topic:" + topic + ",tags:" + tags + ",body:" + body);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //返回消費成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
//      運行消息消費對象
        consumerGroup.start();
    }
}
#### 2、順序消息   消息有序指的是可以按照消息的發送順序來消費。RocketMQ是通過將“相同ID的消息發送到同一個隊列,而一個隊列的消息只由一個消費者處理“來實現順序消息 。 **如何保證順序** - 1) 消息被發送時保持順序:發送時保持順序意味着對於有順序要求的消息,用戶應該在同一個線程中采用同步的方式發送。 - 2) 消息被存儲時保持和發送的順序一致:存儲保持和發送的順序一致則要求在同一線程中被發送出來的消息A和B,存儲時在空間上A一定在B之前。 - 3) 消息被消費時保持和存儲的順序一致:消費保持和存儲一致則要求消息A、B到達Consumer之后必須按照先A后B的順序被處理。
點擊查看模擬消息代碼 ```java /** * 模擬消息 */ @Data @AllArgsConstructor @NoArgsConstructor public class Order {
private Long orderId;
private String desc;

public static List<Order> buildOrders(){
    List<Order> list = new ArrayList<Order>();
    Order order1001a = new Order(1001L,"創建");
    Order order1004a = new Order(1004L,"創建");
    Order order1006a = new Order(1006L,"創建");
    Order order1009a = new Order(1009L,"創建");
    list.add(order1001a);
    list.add(order1004a);
    list.add(order1006a);
    list.add(order1009a);
    Order order1001b = new Order(1001L,"付款");
    Order order1004b = new Order(1004L,"付款");
    Order order1006b = new Order(1006L,"付款");
    Order order1009b = new Order(1009L,"付款");
    list.add(order1001b);
    list.add(order1004b);
    list.add(order1006b);
    list.add(order1009b);
    Order order1001c = new Order(1001L,"完成");
    Order order1006c = new Order(1006L,"完成");
    list.add(order1001c);
    list.add(order1006c);
    return list;
}

}

</details>
 <details>
<summary>點擊查看生產者代碼</summary>
```java
/**
 * Producer端確保消息順序唯一要做的事情就是將消息路由到特定的隊列,
 * 在RocketMQ中,通過MessageQueueSelector來實現分區的選擇
 */
public class ProducerOrder {
        //nameserver地址
        private static String namesrvaddress="127.0.0.1:9876;";

        public static void main(String[] args) throws Exception {
            //創建DefaultMQProducer
            DefaultMQProducer producer = new DefaultMQProducer("order_producer_name");
            //設置namesrv地址
            producer.setNamesrvAddr(namesrvaddress);
            //啟動Producer
            producer.start();
            List<Order> orderList = Order.buildOrders();
            for (Order order : orderList) {
                String body = order.toString();
                //創建消息
                Message message = new Message("orderTopic","order",body.getBytes());
                //發送消息
                SendResult sendResult = producer.send(
                        message,
                        new MessageQueueSelector() {
                            /**
                             *
                             * @param mqs topic中的隊列集合
                             * @param msg 消息對象
                             * @param arg 業務參數
                             * @return
                             */
                            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                //參數是訂單id號
                                Long orderId = (Long) arg;
                                //確定選擇的隊列的索引
                                long index = orderId % mqs.size();
                                return mqs.get((int) index);
                            }
                        },
                        order.getOrderId());
                System.out.println("發送結果="+sendResult);
            }
            //關閉Producer
            producer.shutdown();
        }
    }
點擊查看消費者代碼 ```java /** * 消費者端實現MessageListenerOrderly介口監聽消息來實現順序消息 */ public class ConsumerOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); //從第一個開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("orderTopic","*");
	//MessageListenerOrderly 順序消費
    consumer.registerMessageListener(new MessageListenerOrderly() {
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("當前線程:"+Thread.currentThread().getName()+",接收消息:"+new String(msg.getBody()));
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer started.%n");
}

}

</details>
#### 3、延遲消息
&emsp;&emsp;RocketMQ 支持定時(延遲)消息,但是不支持任意時間精度,僅支持特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。  
  
&emsp;&emsp;延遲消息可以在生產者中直接設置,也可以在rocketmq的配置文件broker.conf中配置:messageDelayLevel=1s|5s|1m|2m|1h|2h......
 <details>
<summary>點擊查看生產者代碼</summary>
```java
/**
 * 延遲消息 生產者
 */
public class ProducerDelay {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
        //設置nameserver
        producer.setNamesrvAddr("127.0.0.1:9876");
        //生產者開啟
        producer.start();
        //創建消息對象
        Message message = new Message("delayTopic","delay","hello world".getBytes());
        //設置延遲時間級別
        message.setDelayTimeLevel(2);
        //發送消息
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);
        //生產者關閉
        producer.shutdown();
    }
}
點擊查看消費者代碼 ```java /** * 延遲消息 消費者 */ public class ConsumerDelay {
public static void main(String[] args) throws Exception {
    //創建消費者對象
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
    //設置nameserver
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //設置主題和tag
    consumer.subscribe("delayTopic","*");
    //注冊消息監聽
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("消息ID:"+msg.getMsgId()+"發送時間:"+new Date(msg.getStoreTimestamp())+",延遲時間:"+(System.currentTimeMillis()-msg.getStoreTimestamp()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //開啟消費者
    consumer.start();
    System.out.println("消費者啟動");
}

}

</details>
#### 4、批量發送消息
 <details>
<summary>點擊查看生產者代碼</summary>
```java
/**
 * 批量 生產者
 */
public class ProducerBatch {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
        //設置nameserver
        producer.setNamesrvAddr("127.0.0.1:9876");
        //生產者開啟
        producer.start();
        //創建消息對象  集合
        String topic = "batchTopic";
        String tag = "batch";
        List<Message> messageList = new ArrayList<Message>();
        Message message1 = new Message(topic,tag,"hello world1".getBytes());
        Message message2 = new Message(topic,tag,"hello world2".getBytes());
        Message message3 = new Message(topic,tag,"hello world3".getBytes());
        messageList.add(message1);
        messageList.add(message2);
        messageList.add(message3);
        //發送消息
        SendResult sendResult = producer.send(messageList);
        System.out.println(sendResult);
        //生產者關閉
        producer.shutdown();
    }
}
點擊查看消費者代碼 ```java /** * 批量消費者 */ public class ConsumerBatch { public static void main(String[] args) throws Exception { //創建消費者對象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); //設置nameserver consumer.setNamesrvAddr("127.0.0.1:9876"); //設置主題和tag consumer.subscribe("batchTopic","*"); //注冊消息監聽 consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("消息ID:"+msg.getMsgId());
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //開啟消費者
    consumer.start();
    System.out.println("消費者啟動");
}

}

</details>
#### 5、廣播消息
&emsp;&emsp;rocketmq默認采用的是集群消費,我們想要使用廣播消費,只需在消費者中加入`consumer.setMessageModel(MessageModel.BROADCASTING)`這段配置,`MessageModel.CLUSTERING`為集群模式,是默認的;
 <details>
<summary>點擊查看生產者代碼</summary>
```java
/**
 * 生產者
 */
public class ProducerBroadcast {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
        //設置nameserver
        producer.setNamesrvAddr("127.0.0.1:9876");
        //生產者開啟
        producer.start();
        //創建消息對象  集合
        String topic = "broadcastTopic";
        String tag = "broad";
        List<Message> messageList = new ArrayList<Message>();
        Message message1 = new Message(topic,tag,"hello world1".getBytes());
        Message message2 = new Message(topic,tag,"hello world2".getBytes());
        Message message3 = new Message(topic,tag,"hello world3".getBytes());
        messageList.add(message1);
        messageList.add(message2);
        messageList.add(message3);
        //發送消息
        SendResult sendResult = producer.send(messageList);
        System.out.println(sendResult);
        //生產者關閉
        producer.shutdown();
    }
}
點擊查看消費者1代碼 ```java /** * 消費者1 */ public class ConsumerBroadcast1 {
public static void main(String[] args) throws Exception {
    //創建消費者對象
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
    //設置nameserver
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //設置主題和tag
    consumer.subscribe("broadcastTopic","*");
    //設置消息模式 為 廣播模式
    consumer.setMessageModel(MessageModel.BROADCASTING);
    //注冊消息監聽
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("消費者1:消息ID:"+msg.getMsgId()+",內容"+new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //開啟消費者
    consumer.start();
    System.out.println("消費者1啟動");
}

}

</details>
 <details>
<summary>點擊查看消費者2代碼</summary>
```java
/**
 * 消費者2
 */
public class ConsumerBroadcast2 {

    public static void main(String[] args) throws Exception {
        //創建消費者對象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
        //設置nameserver
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //設置主題和tag
        consumer.subscribe("broadcastTopic","*");
        //設置消息模式 為 廣播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //注冊消息監聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消費者2:消息ID:"+msg.getMsgId()+",內容"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //開啟消費者
        consumer.start();
        System.out.println("消費者2啟動");
    }
}
### 七、SpringBoot整合RocketMQ   創建一個maven工程,導入依賴 ```java org.springframework.boot spring-boot-starter-web 2.2.1.RELEASE org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2 org.springframework.boot spring-boot-starter-test 2.2.1.RELEASE test org.projectlombok lombok 1.16.22 ```
點擊查看模擬消息代碼 ```java /** * 模擬消息 */ @Data @AllArgsConstructor @NoArgsConstructor public class Order {
private Long orderId;
private String desc;

public static List<Order> buildOrders(){
    List<Order> list = new ArrayList<Order>();
    Order order1001a = new Order(1001L,"1001創建");
    Order order1004a = new Order(1004L,"1004創建");
    Order order1006a = new Order(1006L,"1006創建");
    Order order1009a = new Order(1009L,"1009創建");
    list.add(order1001a);
    list.add(order1004a);
    list.add(order1006a);
    list.add(order1009a);
    Order order1001b = new Order(1001L,"1001付款");
    Order order1004b = new Order(1004L,"1004付款");
    Order order1006b = new Order(1006L,"1006付款");
    Order order1009b = new Order(1009L,"1009付款");
    list.add(order1001b);
    list.add(order1004b);
    list.add(order1006b);
    list.add(order1009b);
    Order order1001c = new Order(1001L,"1001完成");
    Order order1006c = new Order(1006L,"1006完成");
    list.add(order1001c);
    list.add(order1006c);
    return list;
}

}

</details>
 <details>
<summary>點擊查看消息生產者代碼</summary>
```java
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMQTest {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 普通消息生產者
     */
    @Test
    public void testSend(){
        rocketMQTemplate.convertAndSend("testTopic","這是測試消息!");
    }
    
    /**
     * 延遲消息生產者
     */
    @Test
    public void testDelaySend(){
        SendResult sendResult = rocketMQTemplate.syncSend("testTopic",
                new GenericMessage("這是延遲測試消息!"+new Date()),
                10000,
                4);
        log.info("sendResult=="+sendResult);
    }

    /**
     * 順序消息 生產者
     */
    @Test
    public void testOrderlySend(){
        List<Order> orderList = Order.buildOrders();
        for (Order order : orderList) {
            //發送消息
            rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {

                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    //參數是訂單id號
                    Long orderId = Long.valueOf((String)arg);
                    //確定選擇的隊列的索引
                    long index = orderId % mqs.size();
                    log.info("mqs is ::" + mqs.get((int) index));
                    return mqs.get((int) index);
                }
            });
            SendResult sendOrderly = rocketMQTemplate.syncSendOrderly("testTopicOrderLy",
                    new GenericMessage<String>(order.toString()), order.getOrderId().toString());
            log.info("發送結果="+sendOrderly+",orderid :"+order.getOrderId());
        }
    }
}
點擊查看普通|延遲消費者代碼 ```java /** * 普通、延遲消息 消費者代碼 */ @Component @RocketMQMessageListener(consumerGroup = "myConsumer", topic = "testTopic") public class RocketConsumer implements RocketMQListener {
public void onMessage(String message) {
    System.out.println("接收到消息:="+message);
}

}

</details>
 <details>
<summary>點擊查看順序消費者代碼</summary>
```java
/**
 * 順序消息 ,消費者
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "myConsumerOrderly", topic = "testTopicOrderLy",consumeMode = ConsumeMode.ORDERLY)
public class RocketConsumerOrderly implements RocketMQListener<String> {
    
    public void onMessage(String message) {
       log.info("當前線程:"+Thread.currentThread().getName()+",接收到消息:="+message);
    }
}
### 八、RocketMQ的安裝配置 ##### 1、配置系統環境變量;計算機/屬性/高級系統設置/環境變量/系統變量,新建系統變量ROCKETMQ_HOME=RocketMQ安裝路徑 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109132659277-1403218980.png) ##### 2、進入RocketMQ安裝目錄的bin目錄下,右鍵用記事本打開修改runserver.cmd文件 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109135526592-344591921.png) ##### 3、修改runbroker.cmd文件 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109135534570-287023226.png) ##### 4、cmd進入到MQ/bin目錄下啟動 ```java 1.啟動mqnamesrv.cmd start mqnamesrv.cmd ``` 成功的彈窗,此框勿關閉。 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109134804446-1433923488.png) ```java 2.啟動mqbroker.cmd start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true ``` 成功的彈窗,此框勿關閉。 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109134835168-2104617731.png) 注意:假如彈出提示框提示‘錯誤: 找不到或無法加載主類 xxxxxx’。打開runbroker.cmd,然后將‘%CLASSPATH%’加上英文雙引號。保存並重新執行start語句。 ![](https://img2018.cnblogs.com/blog/1900029/202001/1900029-20200109135553489-368231230.png) ##### 5、下載RocketMQ的可視化插件 - 1) 下載地址: https://github.com/apache/rocketmq-externals/releases
  • 2) 修改rocketmq-console\src\main\resources\application.properties,修改如下:

  • 3) cmd窗口執行:mvn clean package -Dmaven.test.skip=true

  • 4) jar包運行:java -jar rocketmq-console-ng-1.0.0.jar

  • 5) 測試輸入地址: http://127.0.0.1:8080/#/ops


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM