RocketMQ入門教程


一、RocketMQ 是什么

     Github 上關於 RocketMQ 的介紹:RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的消息中間件。具有以下特性:

  1. 支持發布/訂閱(Pub/Sub)和點對點(P2P)消息模型

  2. 在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞

  3. 支持拉(pull)和推(push)兩種消息模式

  4. 單一隊列百萬消息的堆積能力

  5. 支持多種消息協議,如 JMS、MQTT 等

  6. 分布式高可用的部署架構,滿足至少一次消息傳遞語義

  7. 提供 docker 鏡像用於隔離測試和雲集群部署

  8. 提供配置、指標和監控等功能豐富的 Dashboard

對於這些特性描述,大家簡單過一眼就即可,深入學習之后自然就明白了。

專業術語

Producer

消息生產者,生產者的作用就是將消息發送到 MQ,生產者本身既可以產生消息,如讀取文本信息等。也可以對外提供接口,由外部應用來調用接口,再由生產者將收到的消息發送到 MQ。

Producer Group

生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組。在這里可以不用關心,只要知道有這么一個概念即可。

Consumer

消息消費者,簡單來說,消費 MQ 上的消息的應用程序就是消費者,至於消息是否進行邏輯處理,還是直接存儲到數據庫等取決於業務需要。

Consumer Group

消費者組,和生產者類似,消費同一類消息的多個 consumer 實例組成一個消費者組。

Topic

Topic 是一種消息的邏輯分類,比如說你有訂單類的消息,也有庫存類的消息,那么就需要進行分類,一個是訂單 Topic 存放訂單相關的消息,一個是庫存 Topic 存儲庫存相關的消息。

Message

Message 是消息的載體。一個 Message 必須指定 topic,相當於寄信的地址。Message 還有一個可選的 tag 設置,以便消費端可以基於 tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業務 key 來查找 broker 上的消息,方便在開發過程中診斷問題。

Tag

標簽可以被認為是對 Topic 進一步細化。一般在相同業務模塊中通過引入標簽來標記不同用途的消息。

Broker

Broker 是 RocketMQ 系統的主要角色,其實就是前面一直說的 MQ。Broker 接收來自生產者的消息,儲存以及為消費者拉取消息的請求做好准備。

Name Server

Name Server 為 producer 和 consumer 提供路由信息。

二、RocketMQ 架構

 

 

由這張圖可以看到有四個集群,分別是 NameServer 集群、Broker 集群、Producer 集群和 Consumer 集群:
  1. NameServer: 提供輕量級的服務發現和路由。 每個 NameServer 記錄完整的路由信息,提供等效的讀寫服務,並支持快速存儲擴展。
  2. Broker: 通過提供輕量級的 Topic 和 Queue 機制來處理消息存儲,同時支持推(push)和拉(pull)模式以及主從結構的容錯機制。
  3. Producer:生產者,產生消息的實例,擁有相同 Producer Group 的 Producer 組成一個集群。
  4. Consumer:消費者,接收消息進行消費的實例,擁有相同 Consumer Group 的Consumer 組成一個集群。

     簡單說明一下圖中箭頭含義,從 Broker 開始,Broker Master1 和 Broker Slave1 是主從結構,它們之間會進行數據同步,即 Date Sync。同時每個 Broker 與NameServer 集群中的所有節點建立長連接,定時注冊 Topic 信息到所有 NameServer 中。

    Producer 與 NameServer 集群中的其中一個節點(隨機選擇)建立長連接,定期從 NameServer 獲取 Topic 路由信息,並向提供 Topic 服務的 Broker Master 建立長連接,且定時向 Broker 發送心跳。Producer 只能將消息發送到 Broker master,但是 Consumer 則不一樣,它同時和提供 Topic 服務的 Master 和 Slave建立長連接,既可以從 Broker Master 訂閱消息,也可以從 Broker Slave 訂閱消息。

 

附加說明:

 

    (1)NameServer是一個幾乎無狀態的節點,可集群部署,節點之間無任何信息同步

    (2)Broker部署相對復雜,Broker氛圍Master與Slave,一個Master可以對應多個Slaver,但是一個Slaver只能對應一個Master,Master與Slaver的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slaver。Master可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有的NameServer

    (3)Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Produce完全無狀態,可集群部署

    (4)Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master、Slaver建立長連接,且定時向Master、Slaver發送心跳。Consumer即可從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定

 

三、RocketMQ 集群部署模式

   0.  單 master 模式

        也就是只有一個 master 節點,稱不上是集群,一旦這個 master 節點宕機,那么整個服務就不可用,適合個人學習使用。

    1. 多 master 模式
      多個 master 節點組成集群,單個 master 節點宕機或者重啟對應用沒有影響。
      優點:所有模式中性能最高
      缺點:單個 master 節點宕機期間,未被消費的消息在節點恢復之前不可用,消息的實時性就受到影響。
      注意:使用同步刷盤可以保證消息不丟失,同時 Topic 相對應的 queue 應該分布在集群中各個節點,而不是只在某各節點上,否則,該節點宕機會對訂閱該 topic 的應用造成影響。
    2. 多 master 多 slave 異步復制模式
      在多 master 模式的基礎上,每個 master 節點都有至少一個對應的 slave。master
      節點可讀可寫,但是 slave 只能讀不能寫,類似於 mysql 的主備模式。
      優點: 在 master 宕機時,消費者可以從 slave 讀取消息,消息的實時性不會受影響,性能幾乎和多 master 一樣。
      缺點:使用異步復制的同步方式有可能會有消息丟失的問題。
    3. 多 master 多 slave 同步雙寫模式
      同多 master 多 slave 異步復制模式類似,區別在於 master 和 slave 之間的數據同步方式。
      優點:同步雙寫的同步模式能保證數據不丟失。
      缺點:發送單個消息 RT 會略長,性能相比異步復制低10%左右。
      刷盤策略:同步刷盤和異步刷盤(指的是節點自身數據是同步還是異步存儲)
      同步方式:同步雙寫和異步復制(指的一組 master 和 slave 之間數據的同步)
      注意:要保證數據可靠,需采用同步刷盤和同步雙寫的方式,但性能會較其他方式低。

四、入門程序

1. 下載

http://rocketmq.apache.org/dowloading/releases/

選擇Binary包

這里解壓縮后放到了D盤, 且為了操作方便, 把文件夾從rocketmq-all-4.5.0-bin-release改名為RocketMQ

默認設置占用內存很大, 如果不是土豪配置需要修改一下

NameServer設置: D:\RocketMQ\bin\runserver.cmd

修改為512m 512m 256m即可

Broker設置: D:\RocketMQ\bin\runbroker.cmd

2. 設置環境變量

桌面的計算機上點擊右鍵 -> 屬性

3. 啟動NameServer

在D:\RocketMQ\bin目錄下啟動命令行, 執行 runserver.cmd

4. 啟動Broker

執行mqbroker.cmd -n localhost:9876

注意圖中, Broker注冊的地址是192.168.12.22:10911, 接下來需要用到

5. 創建Topic

執行mqadmin.cmd updateTopic -n localhost:9876 -b 192.168.12.22:10911 -t demo

其中用-b指定Broker, 即步驟4中顯示的地址

開啟MQ這三個步驟如果出現閃退、找不到主類等問題,可以參考文末連接,有詳細解決方案

6. 開發生產者Producer

建立一個SpringBoot項目, 添加依賴

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.4.0</version>
</dependency>

創建ProducerService

package com.mq.rocketmq.service;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Service
public class ProducerService {


    private DefaultMQProducer producer = null;

    @PostConstruct
    public void initMQProducer() {
        producer = new DefaultMQProducer("defaultGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.setRetryTimesWhenSendFailed(3);

        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public boolean send(String topic, String tags, String content) {
        Message msg = new Message(topic, tags, "", content.getBytes());
        try {
            producer.send(msg);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    @PreDestroy
    public void shutDownProducer() {
        if(producer != null) {
            producer.shutdown();
        }
    }

}

測試

package com.mq.rocketmq;

import com.mq.rocketmq.service.ProducerService;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import static org.junit.jupiter.api.Assertions.assertTrue;


@RunWith(SpringRunner.class)
@SpringBootTest
class RocketmqApplicationTests {

    @Autowired
    private ProducerService producerService;

    @Test
    public void contextLoads() {
        boolean result = producerService.send("demo", "TAG-A", "Hello RocketMQ2");
        assertTrue(result);

    }

}

7. 開發消費者

創建ConsumerService

package com.mq.rocketmq.service;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;

@Service
public class ConsumerService {

    private DefaultMQPushConsumer consumer = null;

    @PostConstruct
    public void initMQConsumer() {
        consumer = new DefaultMQPushConsumer("defaultGroup");
        consumer.setNamesrvAddr("localhost:9876");
        try {
            consumer.subscribe("demo", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println("Message Received: " + new String(msg.getBody()));

                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void shutDownConsumer() {
        if (consumer != null) {
            consumer.shutdown();
        }

    }
}

啟動項目

 

 

原文出處:

https://www.cnblogs.com/zhangwuji/p/8134522.html

https://blog.csdn.net/autfish/article/details/89226461

參考:

windows下安裝rocketmq采坑全記錄, https://blog.csdn.net/kobewwf24/article/details/82712461

 





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM