一、引言
RocketMQ是一款優秀的分布式消息中間件,在各方面的性能都比目前已有的消息隊列要好,RocketMQ默認采用長輪詢的拉模式, 單機支持千萬級別的消息堆積,可以非常好的應用在海量消息系統中。
RocketMQ主要由 Producer、Broker、Consumer、Namesvr 等組件組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息,Namesvr負責存儲元數據,各組件的主要功能如下:
-
消息生產者(Producer):負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產生的消息發送到Broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。
-
消息消費者(Consumer):負責消費消息,一般是后台系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、並將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
-
代理服務器(Broker Server):消息中轉角色,負責存儲消息、轉發消息。代理服務器在RocketMQ系統中負責接收從生產者發送來的消息並存儲、同時為消費者的拉取請求作准備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
-
名字服務(Name Server):名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。多個Namesrv實例組成集群,但相互獨立,沒有信息交換。
-
生產者組(Producer Group):同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則Broker服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。
-
消費者組(Consumer Group):同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。
RocketMQ整體消息處理邏輯上以Topic維度進行生產消費、物理上會存儲到具體的Broker上的某個MessageQueue當中,正因為一個Topic會存在多個Broker節點上的多個MessageQueue,所以自然而然就產生了消息生產消費的負載均衡需求。
本篇文章分析的核心在於介紹RocketMQ的消息生產者(Producer)和消息消費者(Consumer)在整個消息的生產消費過程中如何實現負載均衡以及其中的實現細節。
二、RocketMQ的整體架構

(圖片來自於Apache RocketMQ)
RocketMQ架構上主要分為四部分,如上圖所示:
-
Producer:消息發布的角色,支持分布式集群方式部署。Producer通過MQ的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗並且低延遲。
-
Consumer:消息消費的角色,支持分布式集群方式部署。支持以push推,pull拉兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費,它提供實時消息訂閱機制,可以滿足大多數用戶的需求。
-
NameServer:NameServer是一個非常簡單的Topic路由注冊中心,支持分布式集群方式部署,其角色類似Dubbo中的zookeeper,支持Broker的動態注冊與發現。
-
BrokerServer:Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證,支持分布式集群方式部署。

RocketMQ的Topic的物理分布如上圖所示:
Topic作為消息生產和消費的邏輯概念,具體的消息存儲分布在不同的Broker當中。
Broker中的Queue是Topic對應消息的物理存儲單元。
在RocketMQ的整體設計理念當中,消息的生產消費以Topic維度進行,每個Topic會在RocketMQ的集群中的Broker節點創建對應的MessageQueue。
producer生產消息的過程本質上就是選擇Topic在Broker的所有的MessageQueue並按照一定的規則選擇其中一個進行消息發送,正常情況的策略是輪詢。
consumer消費消息的過程本質上就是一個訂閱同一個Topic的consumerGroup下的每個consumer按照一定的規則負責Topic下一部分MessageQueue進行消費。
在RocketMQ整個消息的生命周期內,不管是生產消息還是消費消息都會涉及到負載均衡的概念,消息的生成過程中主要涉及到Broker選擇的負載均衡,消息的消費過程主要涉及多consumer和多Broker之間的負責均衡。
三、producer消息生產過程

producer消息生產過程:
-
producer首先訪問namesvr獲取路由信息,namesvr存儲Topic維度的所有路由信息(包括每個topic在每個Broker的隊列分布情況)。
-
producer解析路由信息生成本地的路由信息,解析Topic在Broker隊列信息並轉化為本地的消息生產的路由信息。
-
producer根據本地路由信息向Broker發送消息,選擇本地路由中具體的Broker進行消息發送。
3.1 路由同步過程
public class MQClientInstance {
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 省略對應的代碼
} else {
// 1、負責查詢指定的Topic對應的路由信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
// 2、比較路由數據topicRouteData是否發生變更
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
}
// 3、解析路由信息轉化為生產者的路由信息和消費者的路由信息
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// 生成生產者對應的Topic信息
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// 保存到本地生產者路由表當中
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
}
} finally {
this.lockNamesrv.unlock();
}
} else {
}
} catch (InterruptedException e) {
}
return false;
}
}
路由同步過程:
-
路由同步過程是消息生產者發送消息的前置條件,沒有路由的同步就無法感知具體發往那個Broker節點。
-
路由同步主要負責查詢指定的Topic對應的路由信息,比較路由數據topicRouteData是否發生變更,最終解析路由信息轉化為生產者的路由信息和消費者的路由信息。
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
// 按照broker維度保存的Queue信息
private List<QueueData> queueDatas;
// 按照broker維度保存的broker信息
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
public class QueueData implements Comparable<QueueData> {
// broker的名稱
private String brokerName;
// 讀隊列大小
private int readQueueNums;
// 寫隊列大小
private int writeQueueNums;
// 讀寫權限
private int perm;
private int topicSynFlag;
}
public class BrokerData implements Comparable<BrokerData> {
// broker所屬集群信息
private String cluster;
// broker的名稱
private String brokerName;
// broker對應的ip地址信息
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
private final Random random = new Random();
}
--------------------------------------------------------------------------------------------------
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
// 最細粒度的隊列信息
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
}
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
// Topic信息
private String topic;
// 所屬的brokerName信息
private String brokerName;
// Topic下的隊列信息Id
private int queueId;
}
路由解析過程:
-
TopicRouteData核心變量QueueData保存每個Broker的隊列信息,BrokerData保存Broker的地址信息。
-
TopicPublishInfo核心變量MessageQueue保存最細粒度的隊列信息。
-
producer負責將從namesvr獲取的TopicRouteData轉化為producer本地的TopicPublishInfo。
public class MQClientInstance {
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
// 省略相關代碼
} else {
List<QueueData> qds = route.getQueueDatas();
// 按照brokerName進行排序
Collections.sort(qds);
// 遍歷所有broker生成隊列維度信息
for (QueueData qd : qds) {
// 具備寫能力的QueueData能夠用於隊列生成
if (PermName.isWriteable(qd.getPerm())) {
// 遍歷獲得指定brokerData進行異常條件過濾
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
// 遍歷QueueData的寫隊列的數量大小,生成MessageQueue保存指定TopicPublishInfo
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
info.setOrderTopic(false);
}
return info;
}
}
路由生成過程:
-
路由生成過程主要是根據QueueData的BrokerName和writeQueueNums來生成MessageQueue 對象。
-
MessageQueue是消息發送過程中選擇的最細粒度的可發送消息的隊列。
{
"TBW102": [{
"brokerName": "broker-a",
"perm": 7,
"readQueueNums": 8,
"topicSynFlag": 0,
"writeQueueNums": 8
}, {
"brokerName": "broker-b",
"perm": 7,
"readQueueNums": 8,
"topicSynFlag": 0,
"writeQueueNums": 8
}]
}
路由解析舉例:
-
topic(TBW102)在broker-a和broker-b上存在隊列信息,其中讀寫隊列個數都為8。
-
先按照broker-a、broker-b的名字順序針對broker信息進行排序。
-
針對broker-a會生成8個topic為TBW102的MessageQueue對象,queueId分別是0-7。
-
針對broker-b會生成8個topic為TBW102的MessageQueue對象,queueId分別是0-7。
-
topic(名為TBW102)的TopicPublishInfo整體包含16個MessageQueue對象,其中有8個broker-a的MessageQueue,有8個broker-b的MessageQueue。
-
消息發送過程中的路由選擇就是從這16個MessageQueue對象當中獲取一個進行消息發送。
3.2 負載均衡過程
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 1、查詢消息發送的TopicPublishInfo信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
String[] brokersSent = new String[timesTotal];
// 根據重試次數進行消息發送
for (; times < timesTotal; times++) {
// 記錄上次發送失敗的brokerName
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 2、從TopicPublishInfo獲取發送消息的隊列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
// 3、執行發送並判斷發送結果,如果發送失敗根據重試次數選擇消息隊列進行重新發送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
switch (communicationMode) {
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (MQBrokerException e) {
// 省略相關代碼
} catch (InterruptedException e) {
// 省略相關代碼
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
}
}
}
消息發送過程:
-
查詢Topic對應的路由信息對象TopicPublishInfo。
-
從TopicPublishInfo中通過selectOneMessageQueue獲取發送消息的隊列,該隊列代表具體落到具體的Broker的queue隊列當中。
-
執行發送並判斷發送結果,如果發送失敗根據重試次數選擇消息隊列進行重新發送,重新選擇隊列會避開上一次發送失敗的Broker的隊列。
public class TopicPublishInfo {
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
// 按照輪詢進行選擇發送的MessageQueue
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 避開上一次上一次發送失敗的MessageQueue
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
}
路由選擇過程:
-
MessageQueue的選擇按照輪詢進行選擇,通過全局維護索引進行累加取模選擇發送隊列。
-
MessageQueue的選擇過程中會避開上一次發送失敗Broker對應的MessageQueue。

Producer消息發送示意圖:
-
某Topic的隊列分布為Broker_A_Queue1、Broker_A_Queue2、Broker_B_Queue1、Broker_B_Queue2、Broker_C_Queue1、Broker_C_Queue2,根據輪詢策略依次進行選擇。
-
發送失敗的場景下如Broker_A_Queue1發送失敗那么就會跳過Broker_A選擇Broker_B_Queue1進行發送。
四、consumer消息消費過程

consumer消息消費過程:
-
consumer訪問namesvr同步topic對應的路由信息。
-
consumer在本地解析遠程路由信息並保存到本地。
-
consumer在本地進行Reblance負載均衡確定本節點負責消費的MessageQueue。
-
consumer訪問Broker消費指定的MessageQueue的消息。
4.1 路由同步過程
public class MQClientInstance {
// 1、啟動定時任務從namesvr定時同步路由信息
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
}
public void updateTopicRouteInfoFromNameServer() {
Set<String> topicList = new HashSet<String>();
// 遍歷所有的consumer訂閱的Topic並從namesvr獲取路由信息
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 省略代碼
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// 構建consumer側的路由信息
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
}
} finally {
this.lockNamesrv.unlock();
}
}
} catch (InterruptedException e) {
}
return false;
}
}
路由同步過程:
-
路由同步過程是消息消費者消費消息的前置條件,沒有路由的同步就無法感知具體待消費的消息的Broker節點。
-
consumer節點通過定時任務定期從namesvr同步該消費節點訂閱的topic的路由信息。
-
consumer通過updateTopicSubscribeInfo將同步的路由信息構建成本地的路由信息並用以后續的負責均衡。
4.2 負載均衡過程
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}
}
負載均衡過程:
-
consumer通過RebalanceService來定期進行重新負載均衡。
-
RebalanceService的核心在於完成MessageQueue和consumer的分配關系。
public abstract class RebalanceImpl {
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
// 省略相關代碼
break;
}
case CLUSTERING: { // 集群模式下的負載均衡
// 1、獲取topic下所有的MessageQueue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 2、獲取topic下該consumerGroup下所有的consumer對象
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// 3、開始重新分配進行rebalance
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 4、通過分配策略重新進行分配
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 5、根據分配結果執行真正的rebalance動作
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
重新分配流程:
-
獲取topic下所有的MessageQueue。
-
獲取topic下該consumerGroup下所有的consumer的cid(如192.168.0.8@15958)。
-
針對mqAll和cidAll進行排序,mqAll排序順序按照先BrokerName后BrokerId,cidAll排序按照字符串排序。
-
通過分配策略
-
AllocateMessageQueueStrategy重新分配。
-
根據分配結果執行真正的rebalance動作。
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
// 核心邏輯計算開始
// 計算當前cid的下標
int index = cidAll.indexOf(currentCID);
// 計算多余的模值
int mod = mqAll.size() % cidAll.size();
// 計算平均大小
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
// 計算起始下標
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 計算范圍大小
int range = Math.min(averageSize, mqAll.size() - startIndex);
// 組裝結果
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
// 核心邏輯計算結束
@Override
public String getName() {
return "AVG";
}
}
------------------------------------------------------------------------------------
rocketMq的集群存在3個broker,分別是broker_a、broker_b、broker_c。
rocketMq上存在名為topic_demo的topic,writeQueue寫隊列數量為3,分布在3個broker。
排序后的mqAll的大小為9,依次為
[broker_a_0 broker_a_1 broker_a_2 broker_b_0 broker_b_1 broker_b_2 broker_c_0 broker_c_1 broker_c_2]
rocketMq存在包含4個consumer的consumer_group,排序后cidAll依次為
[192.168.0.6@15956 192.168.0.7@15957 192.168.0.8@15958 192.168.0.9@15959]
192.168.0.6@15956 的分配MessageQueue結算過程
index:0
mod:9%4=1
averageSize:9 / 4 + 1 = 3
startIndex:0
range:3
messageQueue:[broker_a_0、broker_a_1、broker_a_2]
192.168.0.6@15957 的分配MessageQueue結算過程
index:1
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:3
range:2
messageQueue:[broker_b_0、broker_b_1]
192.168.0.6@15958 的分配MessageQueue結算過程
index:2
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:5
range:2
messageQueue:[broker_b_2、broker_c_0]
192.168.0.6@15959 的分配MessageQueue結算過程
index:3
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:7
range:2
messageQueue:[broker_c_1、broker_c_2]
分配策略分析:
- 整體分配策略可以參考上圖的具體例子,可以更好的理解分配的邏輯。

consumer的分配:
-
同一個consumerGroup下的consumer對象會分配到同一個Topic下不同的MessageQueue。
-
每個MessageQueue最終會分配到具體的consumer當中。
五、RocketMQ指定機器消費設計思路
日常測試環境當中會存在多台consumer進行消費,但實際開發當中某台consumer新上了功能后希望消息只由該機器進行消費進行邏輯覆蓋,這個時候consumerGroup的集群模式就會給我們造成困擾,因為消費負載均衡的原因不確定消息具體由那台consumer進行消費。當然我們可以通過介入consumer的負載均衡機制來實現指定機器消費。
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
// 通過改寫這部分邏輯,增加判斷是否是指定IP的機器,如果不是直接返回空列表表示該機器不負責消費
if (!cidAll.contains(currentCID)) {
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
}
consumer負載均衡策略改寫:
-
通過改寫負載均衡策略AllocateMessageQueueAveragely的allocate機制保證只有指定IP的機器能夠進行消費。
-
通過IP進行判斷是基於RocketMQ的cid格式是192.168.0.6@15956,其中前面的IP地址就是對於的消費機器的ip地址,整個方案可行且可以實際落地。
六、小結
本文主要介紹了RocketMQ在生產和消費過程中的負載均衡機制,結合源碼和實際案例力求給讀者一個易於理解的技術普及,希望能對讀者有參考和借鑒價值。囿於文章篇幅,有些方面未涉及,也有很多技術細節未詳細闡述,如有疑問歡迎繼續交流。
作者:vivo互聯網服務器團隊-Wang Zhi
