quick start
添加依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
生產者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/**
* 生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組rocketmq支持事務消息,在發送事務消息時,如果事務消息異常(producer掛了),broker端會來回查事務的狀態,這個時候會根據group名稱來查找對應的producer來執行相應的回查邏輯。相當於實現了producer的高可用
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// namesrv地址 多個地址用 ; 隔開 從namesrv上拉取broker信息
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 1000; i++) {
try {
/**
* 創建消息實例,指定topic,tag,消息內容。tag
*/
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 發送消息並獲取發送結果 同步發送
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
SendResult中,有一個sendStatus狀態,表示消息的發送狀態。一共有四種狀態
- FLUSH_DISK_TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策Ill創立設置成
SYNC_FLUSH 才會報這個錯誤) 。 - FLUSH_SLAVE_TIMEOUT :表示在主備方式下,並且Broker 被設置成SYNC_MASTER 方式,沒有
在設定時間內完成主從同步。 - SLAVE_NOT_AVAILABLE : 這個狀態產生的場景和FLUSH_SLAVE_TIMEOUT 類似, 表示在主備方
式下,並且Broker 被設置成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。 - SEND OK :表示發送成功,發送成功的具體含義,比如消息是否已經被存儲到磁盤?消息是否被
同步到了Slave 上?消息在Slave 上是否被寫入磁盤?需要結合所配置的刷盤策略、主從策略來
定。這個狀態還可以簡單理解為,沒有發生上面列出的三個問題狀態就是SEND OK
消費者
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//groupName 將多個consumer分組,提高並發處理能力。需要和MessageModel配合
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 多個地址 ;分開 獲取broker地址 並定時向broker發送心跳 可以從master/slave獲取訂閱
consumer.setNamesrvAddr("localhost:9876");
// 兩種消息模式 BROADCASTING CLUSTERING
consumer.setMessageModel(MessageModel.BROADCASTING);
//設置consumer第一次啟動從隊列頭部還是尾部開始消費
//如果非第一次啟動,那么按上一次消費的位置繼續消費(取決於本地的offeset數據)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// topic 可通過tag過濾消息 * 或 null 代表全部
consumer.subscribe("TopicTest", "*");
/**注冊消息處理回調
* MessageListenerConcurrently 普通監聽
* MessageListenerOrderly 順序監聽
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// todo 消息處理邏輯
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消費狀態
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消費狀態
ConsumeConcurrentlyStatus {
// 消費成功
CONSUME_SUCCESS,
// 使用失敗,稍后嘗試使用
RECONSUME_LATER;
消息發送及消費的基本原理
集群部署,一個master可以有多個slave,一個slave只能有一個master.consumer可以從master獲者slave中訂閱消息
2m-2s示例:
rocketMQ 沒有實現master選舉(通過配置文件來指定主從)
當master掛了后 消費者依然能正常消費消息(slave提供讀服務)
通過groupName實現分區,提高消費者的處理能力
消費者
兩種消費者類型
- DefaultMQPushConsumer 由系統控制讀取操作
DefaultMQPushConsumer
自動保存offset,自動做負載均衡
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//groupName 將多個consumer分組,提高並發處理能力。需要和MessageModel配合
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 多個地址 ;分開
consumer.setNamesrvAddr("localhost:9876");
// 兩種消息模式 BROADCASTING CLUSTERING
consumer.setMessageModel(MessageModel.BROADCASTING);
//第一次啟動從 offset頭開始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// topic 可通過tag過濾消息 * 或 null 代表全部
consumer.subscribe("TopicTest", "*");
//注冊消息處理回調
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// todo 消息處理邏輯
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
兩種消息模式 BROADCASTING CLUSTERING:
1、在 Clustering 模式下,同一個 ConsumerGroup ( GroupName 相同 ) 里的每個 Consumer 只消費所訂閱消息 的一部分 內 容, 同一個 ConsumerGroup里所有的 Consumer 消 費 的內 容合起來才是所訂閱 Topic 內 容 的 整體 ,從而達到負載均衡的目的 (也就是集群消費)
2、在 Broadcasting 模式下,同一個 ConsumerGroup 里的每個 Consumer 都能消費到所訂閱 Topic 的全部消息,也就是一個消息會被多次分發,被多個Consumer 消費 。(也就是廣播模式)
通過長輪詢的方式獲取消息
Broker端HOLD住客戶端過來的請求一小段時間,在這個時間內有新消息到達就利用現有的連接立刻返回消息給Consumer。主動權在Consumer
好處是客戶端能充分利用資源,不至於處理不過來
流量控制
DefaultMQPullConsumer
需要自己維護offset,需要通過遍歷MessageQueue獲取消息
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
// 獲取分片
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
// 獲取到消息
case FOUND:
break;
case NO_MATCHED_MSG:
break;
// 沒有新消息
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
Consumer的啟動、關閉
DefaultMQPushConsumer啟動時不會檢查nameServer地址的正確或者可用性
// 從指定topic中拉取所有消息隊列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
可以通過上面的方法主動拉取消息隊列來判斷nameServer的可用性
關閉時調用shutdown()即可
DefaultMQPullConsumer關閉或者異常退出時需要將offset保存起來
才能保證下次啟動時拉取消息的正確性
consumerGroup:位於同一個consumerGroup中的consumer實例和producerGroup中的各個produer實例承擔的角色類似;同一個group中可以配置多個consumer,可以提高消費端的並發消費能力以及容災
和kafka一樣,多個consumer會對消息做負載均衡,意味着同一個topic下的不messageQueue會分發給同一個group中的不同consumer
消費端的負載均衡
和kafka一樣,消費端也會針對Message Queue做負載均衡,使得每個消費者能夠合理的消費多個分區的消息。
消費端會通過RebalanceService線程,10秒鍾做一次基於topic下的所有隊列負載
-
消費端遍歷自己的所有topic,依次調rebalanceByTopic
-
根據topic獲取此topic下的所有queue
-
選擇一台broker獲取基於group的所有消費端(有心跳向所有broker注冊客戶端信息)
-
選擇隊列分配策略實例AllocateMessageQueueStrategy執行分配算法
什么時候觸發負載均衡
- 消費者啟動之后
- 消費者數量發生變更
- 每10秒會觸發檢查一次rebalance
分配算法
RocketMQ提供了6中分區的分配算法
- (AllocateMessageQueueAveragely)平均分配算法(默認)
- (AllocateMessageQueueAveragelyByCircle)環狀分配消息隊列
- (AllocateMessageQueueByConfig)按照配置來分配隊列: 根據用戶指定的配置來進行負載
- (AllocateMessageQueueByMachineRoom)按照指定機房來配置隊列
- (AllocateMachineRoomNearby)按照就近機房來配置隊列:
- (AllocateMessageQueueConsistentHash)一致性hash,根據消費者的cid進行
生產者
DefaultMQProducer 默認生產者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// producerGroupName
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// namesrv地址 多個地址用 ; 隔開
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 返回
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
消息返回狀態:SendResult.sendStatus
1、FLUSH DISK TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策略設置成 SYNC FLUSH 才會報這個錯誤) 。
2、 FLUSH SLAVE TIMEOUT :表示在主備方式下,並且 Broker 被設置成 SYNC MASTER 方式,沒有在設定時間內完成主從同步 。
3、SLAVE NOT AVAILABLE : 這個狀態產生的場景和 FLUSH SLAVETIMEOUT 類似, 表示在主備 方式下,並且 Broker 被設置成 SYNCMASTER ,但是沒有找到被配置成 S lave 的 Broker 。
4、SEND OK :表示發送成功,發送成功的具體含義,比如消息是否已經被存儲到融盤?消息是否被同步到了 S lave 上?消息在 S lave 上是否被寫人磁盤?需要結合所配置的刷盤策略、主從策略來定 。 這個狀態還可以簡單理解為,沒有發生上面列出的三個問題狀態就是 SEND OK
延遲消息
通過Message.setDelayTimeLevel ( int level ) 方法設置延遲時間,只支持預設值(1s/5s/1Os/30s/Im/2m/3m/4m/5m/6m/7m/8m/9m/1 Om/20m/30m/1 h/2h )。 比如setDelayTimeLevel(3)表示延遲 10s 。
自定義消息發送規則
實現MessageQueueSelector接口
三種默認實現:
SelectMessageQueueByHash
SelectMessageQueueByMachineRoom
SelectMessageQueueByRandom
自定義消息發送可以將消息發送到指定的MessageQueue里
對事物的支持
new TransactionMQProducer("groupName");
設置生產者group,當一個producer掛掉了,消息會分發到其它producer保證消息一定會被回查確定
消息的可靠性原則
只有消費者返回CONSUME_SUCCESS消費成功的才會認為消費成功
返回ConsumeConcurrentlyStatus.RECONSUME_LATER消費失敗會被重試
消息衰減重試
為了保證消息肯定至少被消費一次,RocketMQ會把這批消息重新發回到broker,在延遲的某個時間點
(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup。而如果一直這樣重復消費都持續
失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預
可以修改broker-a.conf文件
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
重試消息的處理機制
一般情況下我們在實際生產中是不需要重試16次,這樣既浪費時間又浪費性能,理論上當嘗試重復次數
達到我們想要的結果時如果還是消費失敗,那么我們需要將對應的消息進行記錄,並且結束重復嘗試
consumer.registerMessageListener((MessageListenerConcurrently) (list,
consumeOrderlyContext) -> {
for (MessageExt messageExt : list) {
if(messageExt.getReconsumeTimes()==3) {
//可以將對應的數據保存到數據庫,以便人工干預
System.out.println(messageExt.getMsgId()+","+messageExt.getBody());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
} r
eturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
死信隊列
RocketMQ會為每個消費組都設置一個Topic命名為“%DLQ%+consumerGroup"的死信隊列。