一、簡介
RocketMq是阿里開發出來的一個消息中間件,后捐獻給Apache。官網上是這樣介紹的:

Apache RocketMQ™ is a unified messaging engine, lightweight data processing platform.
RocketMQ是一個統一的處理消息引擎,輕量級的數據處理平台。
- 低延遲,在高壓下,1毫秒內的響應延遲超過99.6%。
- 高可用,具有跟蹤和審核功能
- 萬億級消息容量保證
- 自最新的4.1版本以來,使用新的開放式分布式消息傳遞和流媒體標准
- 批量傳輸,多功能集成,以提高吞吐量
- 如果空間足夠,可以在不損失性能的情況下存盤
二、基本概念
1、 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。Broker 在實際部署過程中對應一台服務器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲於不同的 Broker。Message Queue 用於存儲消息的物理地址,每個Topic中的消息地址存儲於多個 Message Queue 中。ConsumerGroup 由多個Consumer 實例構成。
2、 消息生產者(Producer)
負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。
3、 消息消費者(Consumer)
負責消費消息,一般是后台系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、並將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
4、 主題(Topic)
表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬於一個主題,是RocketMQ進行消息訂閱的基本單位。
5、 代理服務器(Broker Server)
消息中轉角色,負責存儲消息、轉發消息。代理服務器在RocketMQ系統中負責接收從生產者發送來的消息並存儲、同時為消費者的拉取請求作准備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
6、 名字服務(Name Server)
名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。多個Namesrv實例組成集群,但相互獨立,沒有信息交換。
7、 拉取式消費(Pull Consumer)
Consumer消費的一種類型,應用通常主動調用Consumer的拉消息方法從Broker服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。
8、 推動式消費(Push Consumer)
Consumer消費的一種類型,該模式下Broker收到數據后會主動推送給消費端,該消費模式一般實時性較高。
9、 生產者組(Producer Group)
同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則Broker服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。
10、 消費者組(Consumer Group)
同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。
11、 集群消費(Clustering)
集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。
12、 廣播消費(Broadcasting)
廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
13、 普通順序消息(Normal Ordered Message)
普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。
14、 嚴格順序消息(Strictly Ordered Message)
嚴格順序消息模式下,消費者收到的所有消息均是有順序的。
15、 消息(Message)
消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。
16、 標簽(Tag)
為消息設置的標志,用於同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。
三、架構設計
1、技術架構

RocketMQ架構上主要分為四部分,如上圖所示:
-
Producer:消息發布的角色,支持分布式集群方式部署。Producer通過MQ的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗並且低延遲。
-
Consumer:消息消費的角色,支持分布式集群方式部署。支持以push推,pull拉兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費,它提供實時消息訂閱機制,可以滿足大多數用戶的需求。
-
NameServer:NameServer是一個非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動態注冊與發現。主要包括兩個功能:Broker管理,NameServer接受Broker集群的注冊信息並且保存下來作為路由信息的基本數據。然后提供心跳檢測機制,檢查Broker是否還存活;路由信息管理,每個NameServer將保存關於Broker集群的整個路由信息和用於客戶端查詢的隊列信息。然后Producer和Conumser通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。NameServer通常也是集群的方式部署,各實例間相互不進行信息通訊。Broker是向每一台NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動態感知Broker的路由的信息。
-
BrokerServer:Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證,為了實現這些功能,Broker包含了以下幾個重要子模塊。
- Remoting Module:整個Broker的實體,負責處理來自clients端的請求。
- Client Manager:負責管理客戶端(Producer/Consumer)和維護Consumer的Topic訂閱信息
- Store Service:提供方便簡單的API接口處理消息存儲到物理硬盤和查詢功能。
- HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能。
- Index Service:根據特定的Message key對投遞到Broker的消息進行索引服務,以提供消息的快速查詢。

2、部署架構

RocketMQ 網絡部署特點
-
NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
-
Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。 注意:當前RocketMQ版本在部署架構上支持一Master多Slave,但只有BrokerId=1的從服務器才會參與消息的讀負載。
-
Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由信息,並向提供Topic 服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
-
Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,消費者在向Master拉取消息時,Master服務器會根據拉取偏移量與最大偏移量的距離(判斷是否讀老消息,產生讀I/O),以及從服務器是否可讀等因素建議下一次是從Master還是Slave拉取。
結合部署架構圖,描述集群工作流程:
- 啟動NameServer,NameServer起來后監聽端口,等待Broker、Producer、Consumer連上來,相當於一個路由控制中心。
- Broker啟動,跟所有的NameServer保持長連接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關系。
- 收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic。
- Producer發送消息,啟動時先跟NameServer集群中的其中一台建立長連接,並從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的Broker建立長連接從而向Broker發消息。
- Consumer跟Producer類似,跟其中一台NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。
四、示例
1、 基本樣例
在基本樣例中我們提供如下的功能場景:
- 使用RocketMQ發送三種類型的消息:同步消息、異步消息和單向消息。其中前兩種消息是可靠的,因為會有發送是否成功的應答。
- 使用RocketMQ來消費接收到的消息。
1.1 加入依賴:
maven:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
gradle:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
1.2 消息發送
1、Producer端發送同步消息
這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,短信通知。
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer實例
producer.start();
for (int i = 0; i < 100; i++) {
// 創建消息,並指定Topic,Tag和消息體
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 發送消息到一個Broker
SendResult sendResult = producer.send(msg);
// 通過sendResult返回消息是否成功送達
System.out.printf("%s%n", sendResult);
}
// 如果不再發送消息,關閉Producer實例。
producer.shutdown();
}
}
2、發送異步消息
異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer實例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根據消息數量實例化倒計時計算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
// 創建消息,並指定Topic,Tag和消息體
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收異步返回結果的回調
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再發送消息,關閉Producer實例。
producer.shutdown();
}
}
3、單向發送消息
這種方式主要用在不特別關心發送結果的場景,例如日志發送。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer實例
producer.start();
for (int i = 0; i < 100; i++) {
// 創建消息,並指定Topic,Tag和消息體
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 發送單向消息,沒有任何返回結果
producer.sendOneway(msg);
}
// 如果不再發送消息,關閉Producer實例。
producer.shutdown();
}
}
1.3 消費消息
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 設置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe("TopicTest", "*");
// 注冊回調實現類來處理從broker拉取回來的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 標記該消息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者實例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
1.4 發送延時消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 實例化一個生產者來產生延時消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 啟動生產者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 設置延時等級3,這個消息將在10s之后發送(現在只支持固定的幾個時間,詳看delayTimeLevel)
message.setDelayTimeLevel(3);
// 發送消息
producer.send(message);
}
// 關閉生產者
producer.shutdown();
}
}
1.5 批量消息
批量發送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應超過4MB。
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//處理error
}
復雜度只有當你發送大批量時才會增長,你可能不確定它是否超過了大小限制(4MB)。這時候你最好把你的消息列表分割一下:
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(curIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加⽇日志的開銷20字節
return tmpSize;
}
}
//把大的消息分裂成若干個小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//處理error
}
}
1.6 過濾消息
在大多數情況下,TAG是一個簡單而有用的設計,其可以來選擇您想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消費者將接收包含TAGA或TAGB或TAGC的消息。但是限制是一個消息只能有一個標簽,這對於復雜的場景可能不起作用。在這種情況下,可以使用SQL表達式篩選消息。SQL特性可以通過發送消息時的屬性來進行計算。在RocketMQ定義的語法下,可以實現一些簡單的邏輯。下面是一個例子:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
1 基本語法
RocketMQ只定義了一些基本語法來支持這個特性。你也可以很容易地擴展它。
數值比較,比如:>,>=,<,<=,BETWEEN,=;
字符比較,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
邏輯符號 AND,OR,NOT;
常量支持類型為:
數值,比如:123,3.1415;
字符,比如:'abc',必須用單引號包裹起來;
NULL,特殊的常量
布爾值,TRUE 或 FALSE
只有使用push模式的消費者才能用使用SQL92標准的sql語句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
2、生產者樣例
發送消息時,你能通過putUserProperty來設置消息的屬性
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 設置一些屬性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
3、消費者樣例
用MessageSelector.bySql來使用sql篩選消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有訂閱的消息有這個屬性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
五、集群部署
1 集群搭建
1.1 單Master模式
這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用。不建議線上環境使用,可以用於本地測試。
1)啟動 NameServer
### 首先啟動Name Server
$ nohup sh mqnamesrv &
### 驗證Name Server 是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)啟動 Broker
### 啟動Broker
$ nohup sh bin/mqbroker -n localhost:9876 &
### 驗證Name Server 是否啟動成功,例如Broker的IP為:192.168.1.2,且名稱為broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log
The broker[broker-a, 192.169.1.2:10911] boot success...
1.2 多Master模式
一個集群無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:
-
優點:配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由於RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高;
-
缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到影響。
1)啟動NameServer
NameServer需要先於Broker啟動,且如果在生產環境使用,為了保證高可用,建議一般規模的集群啟動3個NameServer,各節點的啟動命令相同,如下:
### 首先啟動Name Server
$ nohup sh mqnamesrv &
### 驗證Name Server 是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)啟動Broker集群
### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
...
如上啟動命令是在單個NameServer情況下使用的。對於多個NameServer的集群,Broker啟動命令中-n后面的地址列表用分號隔開即可,例如 192.168.1.1:9876;192.161.2:9876。
1.3 多Master多Slave模式-異步復制
每個Master配置一個Slave,有多對Master-Slave,HA采用異步復制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:
-
優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,同時Master宕機后,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工干預,性能同多Master模式幾乎一樣;
-
缺點:Master宕機,磁盤損壞情況下會丟失少量消息。
1)啟動NameServer
### 首先啟動Name Server
$ nohup sh mqnamesrv &
### 驗證Name Server 是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)啟動Broker集群
### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
### 在機器C,啟動第一個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
### 在機器D,啟動第二個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
1.4 多Master多Slave模式-同步雙寫
每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功,才向應用返回成功,這種模式的優缺點如下:
-
優點:數據與服務都無單點故障,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高;
-
缺點:性能比異步復制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點宕機后,備機不能自動切換為主機。
1)啟動NameServer
### 首先啟動Name Server
$ nohup sh mqnamesrv &
### 驗證Name Server 是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)啟動Broker集群
### 在機器A,啟動第一個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
### 在機器B,啟動第二個Master,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
### 在機器C,啟動第一個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
### 在機器D,啟動第二個Slave,例如NameServer的IP為:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
以上Broker與Slave配對是通過指定相同的BrokerName參數來配對,Master的BrokerId必須是0,Slave的BrokerId必須是大於0的數。另外一個Master下面可以掛載多個Slave,同一Master下的多個Slave通過指定不同的BrokerId來區分。$ROCKETMQ_HOME指的RocketMQ安裝目錄,需要用戶自己設置此環境變量。
2 mqadmin管理工具
注意:
- 執行命令方法:./mqadmin {command} {args}
- 幾乎所有命令都需要配置-n表示NameServer地址,格式為ip:port
- 幾乎所有命令都可以通過-h獲取幫助
- 如果既有Broker地址(-b)配置項又有clusterName(-c)配置項,則優先以Broker地址執行命令,如果不配置Broker地址,則對集群中所有主機執行命令,只支持一個Broker地址。-b格式為ip:port,port默認是10911
- 在tools下可以看到很多命令,但並不是所有命令都能使用,只有在MQAdminStartup中初始化的命令才能使用,你也可以修改這個類,增加或自定義命令
- 由於版本更新問題,少部分命令可能未及時更新,遇到錯誤請直接閱讀相關命令源碼
六、FAQ
1、新的consumer從哪里消費?
- 如果topic里面的消息是三天以內的,新consumer從所有消息的第一條開始消費
- 如果有超過三天的消息,新consumer從往前遞推第三天的那條消息開始
- 如果是consumer重啟,從它上次的位置繼續
2、消費失敗了怎么處理?
如果返回 ReconsumerLater,null或者拋出異常,這條消息會重試,最多16次
3、失敗的消息怎么查詢?
- 根據topic和時間片查詢
- 根據topic和messageId查詢
- 根據topic和messageKey查詢
4、消息只傳遞一次嗎?
rocketMQ能保證所有的消息至少執行一次,在大部分情況下,消息不會重復消費,但不是絕對,要自己做冪等設計
5、每條消息保存多久?
3天
6、message body的大小限制?
256kb
7、怎么設置consumer線程數?
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);
8、常見錯誤
- 一個jvm程序只能有一個生產者/消費者實例;
- fastjson版本不能太低;
- topic要提前新建好,並保證有權限。
9、為什么是RocketMQ?
根據我們的研究,隨着隊列和虛擬主題的增加,ActiveMQ IO模塊達到了一個瓶頸。我們試圖通過節流、斷路器或降質來解決這個問題,但效果並不理想。所以我們開始關注流行的消息傳遞解決方案Kafka。不幸的是,Kafka不能滿足我們的要求,特別是在低延遲和高可靠性方面。
在這種情況下,我們決定發明一個新的消息傳遞引擎來處理更廣泛的用例集,從傳統的發布/訂閱場景到高容量實時零損失容忍事務系統。
下表展示了RocketMQ、ActiveMQ和Kafka(根據awesome java,Apache最流行的消息傳遞解決方案)之間的比較:
| 產品 | 語言支持 | 協議和規范 | 消息有序 | 定時支持 | 批量支持 | 廣播 | 消息篩選 | 消息觸發 | 消息存儲 | 信息追溯 | 優先級 | 高可用性和故障切換 | 信息追蹤 | 可配置 | 管理工具 |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ActiveMQ | Java, .NET, C++ etc. | Push model, support OpenWire, STOMP, AMQP, MQTT, JMS | Exclusive Consumer or Exclusive Queues can ensure ordering | Supported | Not Supported | Supported | Supported | Not Supported | Supports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDB | Supported | Supported | Supported, depending on storage,if using kahadb it requires a ZooKeeper server | Not Supported | The default configuration is low level, user need to optimize the configuration parameters | Supported |
| Kafka | Java, Scala etc. | Pull model, support TCP | Ensure ordering of messages within a partition | Not Supported | Supported, with async producer | Not Supported | Supported, you can use Kafka Streams to filter messages | Not Supported | High performance file storage | Supported offset indicate | Not Supported | Supported, requires a ZooKeeper server | Not Supported | Kafka uses key-value pairs format for configuration. These values can be supplied either from a file or programmatically. | Supported, use terminal command to expose core metrics |
| RocketMQ | Java, C++, Go | Pull model, support TCP, JMS, OpenMessaging | Ensure strict ordering of messages,and can scale out gracefully | Supported | Supported, with sync mode to avoid message loss | Supported | Supported, property filter expressions based on SQL92 | Supported | High performance and low latency file storage | Supported timestamp and offset two indicates | Not Supported | Supported, Master-Slave model, without another kit | Supported | Work out of box,user only need to pay attention to a few configurations | Supported, rich web and terminal command to expose core metrics |
歡迎關注公眾號:豐極,更多技術學習分享。
參考網址:
