最全的RocketMQ學習指南,程序員必備的中間件技能


一、簡介

RocketMq是阿里開發出來的一個消息中間件,后捐獻給Apache。官網上是這樣介紹的:
 簡介

Apache RocketMQ™ is a unified messaging engine, lightweight data processing platform.

RocketMQ是一個統一的處理消息引擎,輕量級的數據處理平台。

  • 低延遲,在高壓下,1毫秒內的響應延遲超過99.6%。
  • 高可用,具有跟蹤和審核功能
  • 萬億級消息容量保證
  • 自最新的4.1版本以來,使用新的開放式分布式消息傳遞和流媒體標准
  • 批量傳輸,多功能集成,以提高吞吐量
  • 如果空間足夠,可以在不損失性能的情況下存盤

二、基本概念

1、 消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。Broker 在實際部署過程中對應一台服務器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲於不同的 Broker。Message Queue 用於存儲消息的物理地址,每個Topic中的消息地址存儲於多個 Message Queue 中。ConsumerGroup 由多個Consumer 實例構成。

2、 消息生產者(Producer)

負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。

3、 消息消費者(Consumer)

負責消費消息,一般是后台系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、並將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。

4、 主題(Topic)

表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬於一個主題,是RocketMQ進行消息訂閱的基本單位。

5、 代理服務器(Broker Server)

消息中轉角色,負責存儲消息、轉發消息。代理服務器在RocketMQ系統中負責接收從生產者發送來的消息並存儲、同時為消費者的拉取請求作准備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。

6、 名字服務(Name Server)

名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。多個Namesrv實例組成集群,但相互獨立,沒有信息交換。

7、 拉取式消費(Pull Consumer)

Consumer消費的一種類型,應用通常主動調用Consumer的拉消息方法從Broker服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。

8、 推動式消費(Push Consumer)

Consumer消費的一種類型,該模式下Broker收到數據后會主動推送給消費端,該消費模式一般實時性較高。

9、 生產者組(Producer Group)

同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則Broker服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。

10、 消費者組(Consumer Group)

同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。

11、 集群消費(Clustering)

集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。

12、 廣播消費(Broadcasting)

廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。

13、 普通順序消息(Normal Ordered Message)

普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。

14、 嚴格順序消息(Strictly Ordered Message)

嚴格順序消息模式下,消費者收到的所有消息均是有順序的。

15、 消息(Message)

消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。

16、 標簽(Tag)

為消息設置的標志,用於同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。

三、架構設計

1、技術架構

 架構

RocketMQ架構上主要分為四部分,如上圖所示:

  • Producer:消息發布的角色,支持分布式集群方式部署。Producer通過MQ的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗並且低延遲。

  • Consumer:消息消費的角色,支持分布式集群方式部署。支持以push推,pull拉兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費,它提供實時消息訂閱機制,可以滿足大多數用戶的需求。

  • NameServer:NameServer是一個非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動態注冊與發現。主要包括兩個功能:Broker管理,NameServer接受Broker集群的注冊信息並且保存下來作為路由信息的基本數據。然后提供心跳檢測機制,檢查Broker是否還存活;路由信息管理,每個NameServer將保存關於Broker集群的整個路由信息和用於客戶端查詢的隊列信息。然后Producer和Conumser通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。NameServer通常也是集群的方式部署,各實例間相互不進行信息通訊。Broker是向每一台NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動態感知Broker的路由的信息。

  • BrokerServer:Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證,為了實現這些功能,Broker包含了以下幾個重要子模塊。

  1. Remoting Module:整個Broker的實體,負責處理來自clients端的請求。
  2. Client Manager:負責管理客戶端(Producer/Consumer)和維護Consumer的Topic訂閱信息
  3. Store Service:提供方便簡單的API接口處理消息存儲到物理硬盤和查詢功能。
  4. HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能。
  5. Index Service:根據特定的Message key對投遞到Broker的消息進行索引服務,以提供消息的快速查詢。
     架構

2、部署架構

 架構

RocketMQ 網絡部署特點

  • NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。

  • Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。 注意:當前RocketMQ版本在部署架構上支持一Master多Slave,但只有BrokerId=1的從服務器才會參與消息的讀負載。

  • Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由信息,並向提供Topic 服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。

  • Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,消費者在向Master拉取消息時,Master服務器會根據拉取偏移量與最大偏移量的距離(判斷是否讀老消息,產生讀I/O),以及從服務器是否可讀等因素建議下一次是從Master還是Slave拉取。

結合部署架構圖,描述集群工作流程:

  • 啟動NameServer,NameServer起來后監聽端口,等待Broker、Producer、Consumer連上來,相當於一個路由控制中心。
  • Broker啟動,跟所有的NameServer保持長連接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關系。
  • 收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic。
  • Producer發送消息,啟動時先跟NameServer集群中的其中一台建立長連接,並從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的Broker建立長連接從而向Broker發消息。
  • Consumer跟Producer類似,跟其中一台NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。

四、示例

1、 基本樣例

在基本樣例中我們提供如下的功能場景:

  • 使用RocketMQ發送三種類型的消息:同步消息、異步消息和單向消息。其中前兩種消息是可靠的,因為會有發送是否成功的應答。
  • 使用RocketMQ來消費接收到的消息。

1.1 加入依賴:

maven:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

gradle:

compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

1.2 消息發送

1、Producer端發送同步消息
這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,短信通知。

public class SyncProducer {
	public static void main(String[] args) throws Exception {
    	// 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 設置NameServer的地址
    	producer.setNamesrvAddr("localhost:9876");
    	// 啟動Producer實例
        producer.start();
    	for (int i = 0; i < 100; i++) {
    	    // 創建消息,並指定Topic,Tag和消息體
    	    Message msg = new Message("TopicTest" /* Topic */,
        	"TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 發送消息到一個Broker
            SendResult sendResult = producer.send(msg);
            // 通過sendResult返回消息是否成功送達
            System.out.printf("%s%n", sendResult);
    	}
    	// 如果不再發送消息,關閉Producer實例。
    	producer.shutdown();
    }
}

2、發送異步消息
異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。

public class AsyncProducer {
	public static void main(String[] args) throws Exception {
    	// 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 設置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
    	// 啟動Producer實例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
	
	int messageCount = 100;
        // 根據消息數量實例化倒計時計算器
	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
    	for (int i = 0; i < messageCount; i++) {
                final int index = i;
            	// 創建消息,並指定Topic,Tag和消息體
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收異步返回結果的回調
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
      	              System.out.printf("%-10d Exception %s %n", index, e);
      	              e.printStackTrace();
                    }
            	});
    	}
	// 等待5s
	countDownLatch.await(5, TimeUnit.SECONDS);
    	// 如果不再發送消息,關閉Producer實例。
    	producer.shutdown();
    }
}

3、單向發送消息
這種方式主要用在不特別關心發送結果的場景,例如日志發送。


public class OnewayProducer {
	public static void main(String[] args) throws Exception{
    	// 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 設置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
    	// 啟動Producer實例
        producer.start();
    	for (int i = 0; i < 100; i++) {
        	// 創建消息,並指定Topic,Tag和消息體
        	Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 發送單向消息,沒有任何返回結果
        	producer.sendOneway(msg);

    	}
    	// 如果不再發送消息,關閉Producer實例。
    	producer.shutdown();
    }
}

1.3 消費消息

public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {

    	// 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

    	// 設置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");

    	// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe("TopicTest", "*");
    	// 注冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        consumer.start();
        System.out.printf("Consumer Started.%n");
	}
}

1.4 發送延時消息

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {
   public static void main(String[] args) throws Exception {
      // 實例化一個生產者來產生延時消息
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // 啟動生產者
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // 設置延時等級3,這個消息將在10s之后發送(現在只支持固定的幾個時間,詳看delayTimeLevel)
          message.setDelayTimeLevel(3);
          // 發送消息
          producer.send(message);
      }
       // 關閉生產者
      producer.shutdown();
  }
}

1.5 批量消息

批量發送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應超過4MB。

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //處理error
}

復雜度只有當你發送大批量時才會增長,你可能不確定它是否超過了大小限制(4MB)。這時候你最好把你的消息列表分割一下:

public class ListSplitter implements Iterator<List<Message>> { 
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) { 
        this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size(); 
    }
    @Override public List<Message> next() { 
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex); 
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break; 
            } else {
                totalSize += tmpSize; 
            }
        }
        List<Message> subList = messages.subList(startIndex, nextIndex); 
        currIndex = nextIndex;
        return subList;
    }
    private int getStartIndex() {
        Message currMessage = messages.get(currIndex); 
        int tmpSize = calcMessageSize(currMessage); 
        while(tmpSize > SIZE_LIMIT) {
            currIndex += 1;
            Message message = messages.get(curIndex); 
            tmpSize = calcMessageSize(message);
        }
        return currIndex; 
    }
    private int calcMessageSize(Message message) {
        int tmpSize = message.getTopic().length() + message.getBody().length(); 
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length(); 
        }
        tmpSize = tmpSize + 20; // 增加⽇日志的開銷20字節
        return tmpSize; 
    }
}
//把大的消息分裂成若干個小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //處理error
  }
}

1.6 過濾消息

在大多數情況下,TAG是一個簡單而有用的設計,其可以來選擇您想要的消息。例如:


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消費者將接收包含TAGA或TAGB或TAGC的消息。但是限制是一個消息只能有一個標簽,這對於復雜的場景可能不起作用。在這種情況下,可以使用SQL表達式篩選消息。SQL特性可以通過發送消息時的屬性來進行計算。在RocketMQ定義的語法下,可以實現一些簡單的邏輯。下面是一個例子:


------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

1 基本語法

RocketMQ只定義了一些基本語法來支持這個特性。你也可以很容易地擴展它。

數值比較,比如:>,>=,<,<=,BETWEEN,=;
字符比較,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
邏輯符號 AND,OR,NOT;
常量支持類型為:

數值,比如:123,3.1415;
字符,比如:'abc',必須用單引號包裹起來;
NULL,特殊的常量
布爾值,TRUE 或 FALSE

只有使用push模式的消費者才能用使用SQL92標准的sql語句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

2、生產者樣例
發送消息時,你能通過putUserProperty來設置消息的屬性

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 設置一些屬性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

3、消費者樣例
用MessageSelector.bySql來使用sql篩選消息

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有訂閱的消息有這個屬性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

五、集群部署

1 集群搭建

1.1 單Master模式

這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用。不建議線上環境使用,可以用於本地測試。

1)啟動 NameServer

### 首先啟動Name Server
$ nohup sh mqnamesrv &
 
### 驗證Name Server 是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)啟動 Broker

### 啟動Broker
$ nohup sh bin/mqbroker -n localhost:9876 &

### 驗證Name Server 是否啟動成功,例如Broker的IP為:192.168.1.2,且名稱為broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log 
The broker[broker-a, 192.169.1.2:10911] boot success...

1.2 多Master模式

一個集群無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:

  • 優點:配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由於RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高;

  • 缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到影響。

1)啟動NameServer

NameServer需要先於Broker啟動,且如果在生產環境使用,為了保證高可用,建議一般規模的集群啟動3個NameServer,各節點的啟動命令相同,如下:

### 首先啟動Name Server
$ nohup sh mqnamesrv &
 
### 驗證Name Server 是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)啟動Broker集群

### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
 
### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &

...

如上啟動命令是在單個NameServer情況下使用的。對於多個NameServer的集群,Broker啟動命令中-n后面的地址列表用分號隔開即可,例如 192.168.1.1:9876;192.161.2:9876。

1.3 多Master多Slave模式-異步復制

每個Master配置一個Slave,有多對Master-Slave,HA采用異步復制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:

  • 優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,同時Master宕機后,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工干預,性能同多Master模式幾乎一樣;

  • 缺點:Master宕機,磁盤損壞情況下會丟失少量消息。

1)啟動NameServer

### 首先啟動Name Server
$ nohup sh mqnamesrv &
 
### 驗證Name Server 是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)啟動Broker集群

### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
 
### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
 
### 在機器C,啟動第一個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
 
### 在機器D,啟動第二個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

1.4 多Master多Slave模式-同步雙寫

每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功,才向應用返回成功,這種模式的優缺點如下:

  • 優點:數據與服務都無單點故障,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高;

  • 缺點:性能比異步復制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點宕機后,備機不能自動切換為主機。

1)啟動NameServer


### 首先啟動Name Server
$ nohup sh mqnamesrv &
 
### 驗證Name Server 是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)啟動Broker集群

### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
 
### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
 
### 在機器C,啟動第一個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
 
### 在機器D,啟動第二個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上Broker與Slave配對是通過指定相同的BrokerName參數來配對,Master的BrokerId必須是0,Slave的BrokerId必須是大於0的數。另外一個Master下面可以掛載多個Slave,同一Master下的多個Slave通過指定不同的BrokerId來區分。$ROCKETMQ_HOME指的RocketMQ安裝目錄,需要用戶自己設置此環境變量。

2 mqadmin管理工具

注意:

  • 執行命令方法:./mqadmin {command} {args}
  • 幾乎所有命令都需要配置-n表示NameServer地址,格式為ip:port
  • 幾乎所有命令都可以通過-h獲取幫助
  • 如果既有Broker地址(-b)配置項又有clusterName(-c)配置項,則優先以Broker地址執行命令,如果不配置Broker地址,則對集群中所有主機執行命令,只支持一個Broker地址。-b格式為ip:port,port默認是10911
  • 在tools下可以看到很多命令,但並不是所有命令都能使用,只有在MQAdminStartup中初始化的命令才能使用,你也可以修改這個類,增加或自定義命令
  • 由於版本更新問題,少部分命令可能未及時更新,遇到錯誤請直接閱讀相關命令源碼

六、FAQ

1、新的consumer從哪里消費?

  • 如果topic里面的消息是三天以內的,新consumer從所有消息的第一條開始消費
  • 如果有超過三天的消息,新consumer從往前遞推第三天的那條消息開始
  • 如果是consumer重啟,從它上次的位置繼續

2、消費失敗了怎么處理?

如果返回 ReconsumerLater,null或者拋出異常,這條消息會重試,最多16次

3、失敗的消息怎么查詢?

  • 根據topic和時間片查詢
  • 根據topic和messageId查詢
  • 根據topic和messageKey查詢

4、消息只傳遞一次嗎?

rocketMQ能保證所有的消息至少執行一次,在大部分情況下,消息不會重復消費,但不是絕對,要自己做冪等設計

5、每條消息保存多久?

3天

6、message body的大小限制?

256kb

7、怎么設置consumer線程數?

consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);

8、常見錯誤

  • 一個jvm程序只能有一個生產者/消費者實例;
  • fastjson版本不能太低;
  • topic要提前新建好,並保證有權限。

9、為什么是RocketMQ?

根據我們的研究,隨着隊列和虛擬主題的增加,ActiveMQ IO模塊達到了一個瓶頸。我們試圖通過節流、斷路器或降質來解決這個問題,但效果並不理想。所以我們開始關注流行的消息傳遞解決方案Kafka。不幸的是,Kafka不能滿足我們的要求,特別是在低延遲和高可靠性方面。

在這種情況下,我們決定發明一個新的消息傳遞引擎來處理更廣泛的用例集,從傳統的發布/訂閱場景到高容量實時零損失容忍事務系統。

下表展示了RocketMQ、ActiveMQ和Kafka(根據awesome java,Apache最流行的消息傳遞解決方案)之間的比較:

產品 語言支持 協議和規范 消息有序 定時支持 批量支持 廣播 消息篩選 消息觸發 消息存儲 信息追溯 優先級 高可用性和故障切換 信息追蹤 可配置 管理工具
ActiveMQ Java, .NET, C++ etc. Push model, support OpenWire, STOMP, AMQP, MQTT, JMS Exclusive Consumer or Exclusive Queues can ensure ordering Supported Not Supported Supported Supported Not Supported Supports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDB Supported Supported Supported, depending on storage,if using kahadb it requires a ZooKeeper server Not Supported The default configuration is low level, user need to optimize the configuration parameters Supported
Kafka Java, Scala etc. Pull model, support TCP Ensure ordering of messages within a partition Not Supported Supported, with async producer Not Supported Supported, you can use Kafka Streams to filter messages Not Supported High performance file storage Supported offset indicate Not Supported Supported, requires a ZooKeeper server Not Supported Kafka uses key-value pairs format for configuration. These values can be supplied either from a file or programmatically. Supported, use terminal command to expose core metrics
RocketMQ Java, C++, Go Pull model, support TCP, JMS, OpenMessaging Ensure strict ordering of messages,and can scale out gracefully Supported Supported, with sync mode to avoid message loss Supported Supported, property filter expressions based on SQL92 Supported High performance and low latency file storage Supported timestamp and offset two indicates Not Supported Supported, Master-Slave model, without another kit Supported Work out of box,user only need to pay attention to a few configurations Supported, rich web and terminal command to expose core metrics

歡迎關注公眾號:豐極,更多技術學習分享。

參考網址:

http://rocketmq.apache.org/

http://rocketmq.apache.org/docs/quick-start/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM