概述

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
Start
DefaultMQProducer#start
首先進入 start 方法,可以看出主要的功能實現在於 defaultMQProducerImpl.start(),先忽略細枝末節,接着進去看看
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
DefaultMQProducerImpl#start()
然后,我們可以看到會根據當前生產者的狀態來進行不同的行為
記得在設計模式里,這叫做"狀態模式"
具體的狀態有:
- CREATE_JUST
- RUNNING
- START_FAILED
- SHUTDOWN_ALREADY
在進入 start 后狀態會變成 START_FAILED
,完成后變成 RUNNING
狀態
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// -- 跳過 --
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
進入 CREATE_JUST
后開始對元信息進行檢查與注冊
this.checkConfig();
// 如果自定義了生產者組,則修改PID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 從MQ工廠獲取實例
// MQ工廠保證ClientID唯一
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注冊生產者組
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
然后具體看 mQClientFactory.start()
方法
MQClientInstance#start
MQClientInstance 是由一個 JAVA 程序所共用的,其可以從 ClientId 的生成方法看出
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
由以上代碼可以得知,一台機器上的一個 JVM 進程只擁有一個實例,所以以下的初始化方法也是全局的
首先對當前對象加鎖來避免多線程帶來的問題,然后又進行了一次狀態判斷來保證狀態正確。然后就啟動了一堆服務。
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// NameSrv 地址為空時,嘗試通過設定的地址使用HTTP獲取NameSrv地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 開啟 Netty 的請求響應的 Channel
this.mQClientAPIImpl.start();
// 開啟調度任務
this.startScheduledTask();
// 開啟拉取服務
this.pullMessageService.start();
// 開啟再均衡服務
this.rebalanceService.start();
// 開啟push服務
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
其中的 pullMessageService
和 rebalanceService
等服務都是繼承於 ServiceThread
抽象類,這個類被用於在多線程的情況下保證服務啟動的正確性。
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
// 只會被啟動一次
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
// 啟動子類的實現線程,從子類獲取服務名
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
其中 RebalanceService
和 PullMessageService
我們在其他章節再具體分析。
此外,其中的 startScheduledTask()
又開啟了一些定時運行的任務
// 從遠程服務器不斷更新 NameServer 地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 定時從NameServer更新Topic的路由信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 定期清除離線的Broker地址,同時發送心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 持久化所有的擁有的消費者偏移量
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 動態對所有消費者的線程池容量進行調整
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
從這里可以看出,消費者不僅是在內存保存了偏移量,還會定期持久化以保證不丟失。
運行這些任務的,是一個 SingleThreadScheduledExecutor
,這是一個由一個線程去執行需要被定時執行的任務的線程池。
在完成以上任務后,回到我們的 DefaultMQProducerImpl.start() 方法看剩下的兩段
// 為所有Broker發送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 開啟所有的調度任務
this.startScheduledTask();
首先來看第一個方法
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
// 發送心跳,但第一次的時候是空的,所以不用考慮
this.sendHeartbeatToAllBroker();
// 上傳過濾器Class,消費者相關
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
}
在發送心跳的時候,由於這時候還沒有從 NameServer 獲取 Broker 地址,所以不會發送,而上傳過濾器 Class 我們留到消費者的章節再講。
第二個方法比較簡單,開啟一個調度任務來處理所有的 Request 狀態,對異步的請求根據狀態處理回調函數。
private void startScheduledTask() {
// 原子增加生產者數量
if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 處理異步回調請求,掃描所有Request狀態
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
}
}
需要注意的是,這個異步請求指的並不是在
send
中的異步回調機制,而是在 rockmq-4.7.0 后加入的 Request-Reply 特性,用來支持 RPC 調用。實現的方法是:
- Producer 投遞消息到 Broker,然后阻塞直到收到 返回結果
- Broker 收到后像正常消息一樣 ack,Consumer 通過 pull\push 從 Broker 中獲取
- Consumer 獲取后進行處理,然后響應 返回結果
- Broker 收到 返回結果 后將其發回對應的 Producer
核心的方法與類有:
DefaultMQProducerImpl#request() RequestResponseFuture#waitResponseMessage()#putResponseMessage() MessageUtil#createReplyMessage() ReplyMessageProcessor#processReplyMessageRequest() ClientRemotingProcessor#receiveReplyMessage()
自此,Start
方法就完成了。
Start 總體流程
-
設置生產者組
-
檢查當前狀態,只允許為 CreateJust
-
從 MQClientManage 獲取 MQClient 工廠
3.1 已經被創建則直接返回
-
注冊生產者組
-
啟動 MQClient 工廠
5.1 NameSrv 地址為空時,嘗試通過設定的地址使用HTTP獲取NameSrv地址
5.2 開啟 Netty 的請求響應的 Channel
5.3 開啟調度任務
5.3.1 從遠程服務器不斷更新 NameServer 地址
5.3.2 定時從NameServer更新Topic的路由信息
5.3.3 定期清除離線的Broker地址,同時發送心跳
5.3.4 持久化所有的擁有的消費者偏移量
5.3.5 動態對所有消費者的線程池容量進行調整
5.4 開啟拉取服務
5.5 開啟再均衡服務
5.6 開啟push服務
-
啟動 trace dispatcher 服務
Send
進入 send 后會進行完整檢查,且默認工作模式為 Sync,send 的主要工作的方法如下
DefaultMQProducerImpl#sendDefaultImpl
這個方法很長,我們一段段來看
// 確保是 Start 狀態
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 得到Topic信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// 返回為Null的結果只有Topic不存在且自動創建Topic沒有打開
if (topicPublishInfo != null && topicPublishInfo.ok()) { /* pass */ }
// 檢查下是不是NameSev地址填錯了
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
首先是確保狀態正確,然后記錄啟動時間作為度量和超時檢查,然后調用 tryToFindTopicPublishInfo
獲得具體的 Topic 信息以發送消息,如果找不到則拋異常。
然后進入 tryToFindTopicPublishInfo
看看具體實現
// 尋找消息應該被發到哪
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 本地沒有、或未就緒,從NameServ請求
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 沒有信息則從NameServ拉取
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 拉取不到?說明我們要發送的目標Topic不存在
// 那就打開isDefault開關,向默認Topic發送
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
這個方法可以看到,在本地會有一個本地 Topic 表,沒有會嘗試去 NameServer 拉取。
而拉取分為兩階段,第一次拉取會去找對應的 Topic ,失敗則第二次會去找 Default Topic。為什么會這樣做呢?我們都知道自動創建 Topic 只會在 Broker 打開自動創建 Topic 的開關才有效,而具體的實現方法需要我們再往下看
MQClientInstance#updateTopicRouteInfoFromNameServer
又進入了 MQClientInstance
,我們剛剛已經了解到它會由 MQClientManager
創建一個全局的實例,而它內部有幾個重要的 Map。
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable;
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable;
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable;
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable;
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable;
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable;
從變量名和和注釋我們不難理解它們是做什么的。我們可以發現, Client 的元信息是由所有的消費者和生產者共享。
回歸正題,接着看更新路由信息的方法
TopicRouteData topicRouteData;
// 使用默認 Topic ,因為目標Topic不存在,所以需要新建
if (isDefault && defaultMQProducer != null) {
// 獲取默認Topic路由信息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
// 獲得了信息后,接下來都是新的Topic都是繼承於默認Topic的信息
if (topicRouteData != null) {
// 修正讀寫Queue數量
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
這里分為直接從 NameServer 獲取和獲取 default topic 后對其元信息進行繼承。
關於 ReadQueue 和 WriteQueue
可讀 Queue 代表消費者可以讀取的 Queue,可寫 Queue 代表生產者可以寫入的 Queue。
將它們進行分離的主要是為了方便的動態調整 Queue 的大小
其中 getTopicRouteInfoFromNameServer
主要是 Netty 使用 RPC 獲取 Topic,具體內容在其他章看看
在獲取了路由信息后,就開始對其進行檢查更改的項目
if (topicRouteData != null) {
// 檢查路由信息是否發生改變
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
// 發生改變則進行更改
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
// 更新路由地址
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
// 在這,被新建的Topic放入了本地生產者表
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
// 重新放入
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
如果是 default topic 的話,就會在這一步放入本地。
這是因為,在 Broker 中,如果開啟了自動創建 Topic 的選項,便會創建一個和 default 同名(default 的默認名稱為 TBW102)的 Topic,並以這個 Topic 來創建新的 Topic ,但若沒有開啟,則會因為找不到 TopicName 而返回錯誤。
DefaultMQProducerImpl#sendDefaultImpl
我們再回到發送方法來,在獲取到要發送的 Topic 的元信息后,就可以開始發送了
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 最多發送timesTotal次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 選擇一個Queue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
/* pass */
} else {
break;
}
}
在這里,Producer 會根據發送類型來選擇發送次數,同步會選擇默認的重試次數加一,而異步和oneway則只會嘗試一次
在發送流程中,首先需要選擇 Topic 中的 Queue
MQFaultStrategy#selectOneMessageQueue
從類名可以看出,這個類主要實現故障退避功能,同時使用輪詢的方式來選擇 Queue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 故障退避: 當要發送的Broker在上一次發送中延遲了較久的時間或發送失敗,會進行一段時間的退避
if (this.sendLatencyFaultEnable) {
try {
// 首先,增加線程本地的輪詢計數
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 如果可用(不需要進行退避),則直接使用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 在所有Broker都需要退避的情況下,即沒有最優解,選擇次優
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
// 如果可寫 Queue 已經為零,說明已經不在了
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
// 如果沒有開啟開關,則選擇一個不是上一次發送的Broker來發送
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
大多數內容直接看代碼就能理解,所以我們主要看下 pickOneAtLeast
方法
LatencyFaultToleranceImpl#pickOneAtLeast
這個方法用於在所有 Broker 都需要"故障退避"的時候,選擇一個可能最好的
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<FaultItem>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}
if (!tmpList.isEmpty()) {
Collections.shuffle(tmpList);
Collections.sort(tmpList);
final int half = tmpList.size() / 2;
if (half <= 0) {
return tmpList.get(0).getName();
} else {
final int i = this.whichItemWorst.incrementAndGet() % half;
return tmpList.get(i).getName();
}
}
return null;
}
這個方法主要的策略是:
對當前的處於"故障退避"狀態的 Broker 使用洗牌算法(這有啥意義...),然后進行排序,然后選擇所有元素的前半段使用輪詢策略
排序方案:是否可用 > 上次響應時間(短>長)(發生網絡分區的時候會相同)> 未來可用時間點(近>遠)
DefaultMQProducerImpl#sendDefaultImpl
選擇完 Queue 后,就要進行實際的發送了
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
// 在重新發送期間使用命名空間重置主題
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
// 消耗時間的度量
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 發送MSG
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 更新故障退避功能
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
// 根據發送方式選擇返回結果
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (XXX e) {
/* 這里刪除了一堆Expection catch, 它們都主要做了更新"故障退避", 然后拋出 */
}
可以看出,實際的發送方法是 sendKernelImpl
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
// 不在內存中,嘗試獲取或創建
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
// 是否使用vip,broker有兩個端口共同服務
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
// 對於MessageBatch,生成過程中已經設置了ID
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
// 將實例名設置為命名空間
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
// 是否為壓縮消息
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
// 發送消息的校驗鈎子
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
// 發送消息前的鈎子
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
// 組裝消息頭
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
// 發往重發Topic的消息
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
// 從命名空間中解包
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
// 超時檢查
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 發送異步消息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
// 發送消息后的鈎子
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
這個方法干了不少臟活,不過發送的具體實現還是通過 MQClientAPIImpl.sendMessage
來實現。
send 發送流程
-
檢查消息
-
從消息的目的 TopicName 中獲取元信息;若獲取不到 Topic,則拋異常
2.1 從本地獲取,沒有則從 NameServer 獲取
2.1.1 從 NameServer 獲取 Topic 元信息,沒有則直接返回
2.1.2 更新獲取的 Topic 的路由信息
2.2 將獲取的 Topic 直接返回,若 NameServer 也沒,則進行創建
2.2.1 獲取默認 Topic;獲取失敗直接返回
2.2.2 繼承該 Topic 的信息來進行更改以作為新 Topic
-
從 Topic 中選擇 Queue
3.1 排除掉在故障退避的 Broker 后,將下一個 Broker 所在的 Queue 返回
3.2 所有 Broker 都需要退避下,選擇次優 Broker
-
發送消息;失敗則退回第三步
4.1 Vip 檢查
4.2 消息類型檢查
4.3 調用鈎子
4.4 組裝消息頭
4.5 發送消息
-
更新故障退避信息
-
根據發送方式返回結果