概述
在上文中,我們討論了消費者對於消息拉取的實現,對於 RocketMQ 這個黑盒的心臟部分,我們順着消息的發送流程已經將其剖析了大半部分。本章我們不妨乘勝追擊,接着討論各種不同的消息的原理與實現。
事務消息
概念
RocketMQ 中的事務消息功能,實際上是 分布式事務中的本地事務表 的實現,只不過,在這里用消息中間件來代替了數據庫,同時也幫我們做好了回查的操作。
在這點上,RocketMQ 和 Kafka 是截然不同的,kafka 的事務是用來實現 Exacltly Once 語義,且該語義主要用來流計算中,即在 "從 Topic 中讀 -> 計算 -> 存到 Topic" 保證不被重復計算。
事務流程
- 客戶端發送 half 消息
吐槽一下為什么要叫半消息(half message),叫 prepare 消息不是更直觀嗎
- Broker 將 half 消息持久化
- 客戶端根據事務執行結果,發送 Commit / Rollback 消息
- Broker 收到 Commit 時,將事務消息對消費者可見。收到 Rollback 時,將消息丟棄
補償
- Broker 過久未收到事務執行結果,詢問客戶端執行結果
- 客戶端收到結果查詢請求,執行回查方法,發送 Commit / Rollback 方法
- Broker 根據事務執行結果做出對應處理
源碼流程
第一步
在設置好了事務監聽器后(執行事務 與 事務回查),就可以發送事務消息
在將事務消息交給發送方法后,客戶端首先會為消息添加事務消息的標識
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
然后將該事務消息會像普通的同步消息一樣發送(且是同步發送)
sendResult = this.send(msg);
第二步
在 Broker 端接收到消息以后,會走與普通消息相同的底層通道(因為這個消息本身就只是個加上了 事務flag 的普通消息),然后由 TransactionalMessageService
來對這個消息進行額外處理。
首先會對該消息放入 real topic
屬性和 real queue
屬性,然后將消息 Topic 替換為用於處理所有事務消息的特殊的 Topic,當然該 Topic 對消費者是不可見的。
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 設置標記為未收到結果
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 替換到特殊的 Topic (RMQ_SYS_TRANS_HALF_TOPIC)
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
完成后,會送到 MessageStore 像普通消息一樣處理
普通消息的具體流程見 RocketMQ源碼詳解 | Broker篇 · 其二:文件系統
第三步
回到 Producer 端,在事務消息發送完成后,該方法會使用專門的線程池執行事務
// 2.執行本地事務,更新事務獲取狀態
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
然后對本地的事務執行狀態進行處理,也就是將該執行狀態上報
this.endTransaction(msg, sendResult, localTransactionState, localException);
這里會發送一條 oneway 命令給 Broker 端,且使用的是 RequestCode.END_TRANSACTION
請求碼
// 事務結果報告(可能是 commit 或 rollback)
public static final int END_TRANSACTION = 37;
完成處理后,該方法會將事務的發送結果和本地事務的執行結構都返回給上層 API
第四步
在 Broker 端,這里會由 EndTransactionProcessor
處理器來處理該請求碼
然后,根據事務的執行結果來做不同的處理
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 事務執行成功,嘗試完成事務
// 獲取 half 消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
if (res.getCode() == ResponseCode.SUCCESS) {
// 將 half 消息取出,構造真實消息,然后投入實際上的 Topic
/* pass */
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
/*
* 找到半消息,進行刪除
* 刪除並不是物理上的刪除,因為物理上的刪除的代價十分的高昂,而是寫入一條具有相同事務id的消息到 op Topic
*/
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
}
如果需要回滾,則對相應的半消息進行刪除,且和上面一樣,並不是物理上的刪除,而是發送具有相同事務 id 的消息到 OP Topic,來標記這個事務已經完成了(Commit/Rollback), OP Topic 也是一個特殊的 Topic,同樣對消費者不可見。
if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 事務執行失敗,進行 half 消息的回滾
// 首先找到 half 消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 進行刪除
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
當這些都做完后,一次事務就完成了。
補償
當然啦,以上是順利的情況,我們當然不能指望事務每一次都能執行成功、網絡分區和宕機事件永遠不會發生。
在一段時間后,如果客戶端沒有對事務的狀態進行上報(或者上報的狀態不是 Commit 或 Rollback,而是 Unknown), Broker 端當然就要進行事務狀態的回查。
在 BrokerController
啟動的時候,會開啟事務狀態檢測服務,該服務會通過循環調用 TransactionalMessageServiceImpl.check()
方法,不斷的掃描未結束的事務,同時對超過指定時間還不知道狀態的事務進行回查操作。
check()
方法是事務回查的核心,由於很長,我們先來看第一部分(刪減了沒人在意的 Log)
// 首先找到存儲所有 half 消息的 Topic
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
// 對其中每一個 queue 進行檢查
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
// 獲得對應的 op 消息所在的 queue
MessageQueue opQueue = getOpQueue(messageQueue);
// 獲取未處理的 half 消息的起始偏移量
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 獲取 op 消息的 queue 的起始偏移量
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
// 用來記錄已經被處理了的 op 消息的偏移量
List<Long> doneOpOffset = new ArrayList<>();
// 用來記錄已經完成了的 half 消息的偏移量
// key: halfOffset, value: opOffset
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
在 fillOpRemoveMap
方法中,主要是將 op 消息取出,來標記可以被移除的 half 消息(op 消息的存在代表對應事務的結束)
/**
* 讀取op消息,解析op消息,填充removeMap
*
* @param removeMap 要刪除的半消息,key: halfOffset,value: opOffset
* @param opQueue Op message queue.
* @param pullOffsetOfOp op message queue 的起始偏移量
* @param miniOffset half message queue 的當前最小偏移量
* @param doneOpOffset 存儲已處理的 op 消息
* @return 獲取到的 Op 消息
*/
private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
// 首先通過 queue 獲取 op 消息,最大數量為 32 條
PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
/* pass: pullResult 消息的意外狀態的處理 */
List<MessageExt> opMsg = pullResult.getMsgFoundList();
for (MessageExt opMessageExt : opMsg) {
// op 消息的 body 存儲的是對應的 half 消息的偏移量, 現在將其取出
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
// 感覺這里的 Tag 並沒有什么意義,無論是 Commit 還是 Rollback 都會加入這個 Tag
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
// 在 已處理偏移量 之前的話則可直接放入 已處理偏移量集合
if (queueOffset < miniOffset) {
doneOpOffset.add(opMessageExt.getQueueOffset());
} else {
// 否則放入需要移除的 half 的消息的集合
removeMap.put(queueOffset, opMessageExt.getQueueOffset());
}
}
}
return pullResult;
}
然后進入到 check
方法的第二部分
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) break;
// 推進最小已處理偏移量
if (removeMap.containsKey(i)) /* 如果該 half 消息存在對應的 op 消息,說明已經被處理了(commit/rollback) */ {
// 取出放入到已處理偏移量隊列
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else /* 否則說明當前 half 消息懸而未決 */ {
// 取出對應的半消息
GetResult getResult = getHalfMsg(messageQueue, i);
/* pass: 半消息不存在時的意外處理 */
/*
* 檢測是否要丟棄或跳過
* 丟棄條件: 當前事務已經超過了最大回查次數(15次)
* 跳過條件: 已經超過了過期文件最大保留時間(72小時)
*/
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
// 處理並推進偏移量
// 具體的處理方法是: 投入 TRANS_CHECK_MAX_TIME_TOPIC 這個 Topic,等待手動處理
listener.resolveDiscardMsg(msgExt);
// 進入到下一個 half 消息
newOffset = i + 1;
i++;
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
break;
}
上面的方法很好理解,只是對於已經被標記結束的事務的處理、和未結束事務的補足
接下來是第三部分,這里將繼續對未結束事務的補足,與進行可能的回查操作
// half 消息具有最小的檢查時間(免疫時間), 檢測時間以內可以跳過回查, 重新投入 half 消息的 Topic
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
break;
}
}
/*
* 對於當前事務的回查操作,需要滿足三個條件之一
* 1.當前 op 消息的集合為空,且已經超過了最小檢查時間(免疫時間)
* 2.最大偏移量的 op 消息的生成時間 已經超過了 最小檢查時間
* 3.關閉最小檢查時間
*/
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
// 先將當前 half 消息放回
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 然后向 Product 發送檢測消息
listener.resolveHalfMsg(msgExt);
} else {
// 否則更新 op 消息集合,以確保能夠斷言該 half 消息的狀態
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
continue;
}
}
newOffset = i + 1;
i++;
}
上面這段代碼主要圍繞 "是否進行回查" 展開,且涉及到 "免疫時間"。
在一個事務消息被發送后,對應事務的執行當然需要一定的執行時間,如果我們不設置這個時間立刻進行回查,那么很有可能時候事務還沒執行完,對於大多數情況下還沒執行完的事務進行回查,毫無疑問帶來的收益很低。所以我們需要設定一個時間,在這個時間內的事務先暫時不回查,這個時間就叫做"免疫時間"。
然后再來看下需要進行回查的三種情況:
- 當 op 消息的集合為空,說明當前還沒有收到讓當前事務結束的通知,且超過了"免疫時間",故回查
- 當前 op 消息最大偏移量的生成時間超過了"免疫時間",說明該事務的提交消息可能丟失了,故回查
- 不啟用 "免疫時間"
其中發送的回查消息的請求碼為 RequestCode.CHECK_TRANSACTION_STATE
,發送的也是 oneway 消息
最后的第四部分,同時更新 half 和 op 消息在 Queue 中的偏移量
// 對所有的 half 消息計算完成后,更新偏移量
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
// 根據已經被標記為完成的 op 消息更新偏移量
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
// 如果不等,說明並不是所有的 op 消息都被標記為完成了
// 所以我們只將偏移量更新到第一個未完成的 op 消息的位置,其后面的 op 消息會在下次重復處理
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
然后在 Producer 這邊,將由 ClientRemotingProcessor.checkTransactionState()
來處理回查操作
// 獲取事務 ID
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
// 從 MQClientFactory 找到注冊的對應 Producer
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 讓 Producer 檢查在對應 IP 上的事務狀態
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
再進入 producer.checkTransactionState()
看看 Producer 是怎樣檢查事務狀態的
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
// 取出當前 Producer 的事務監聽器
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
// 調用其的事務回查方法
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
// 再將事務執行結果其發回給 Broker
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
最后發回的方法做的事情和在一開始發送事務狀態的方法,所做的事情是一樣的。Broker 做的處理也是一樣的。
這樣,補償流程就執行完了。
批量消息
概念
在消息隊列中,批量消息也是一個重要的部分,將消息壓縮在一起發送不僅可以減少帶寬的消耗,還能節省頭部占用的空間。
有點失望的是,RocketMQ 對於批量消息的實現有點"粗糙"了
源碼流程
首先,在調用 send()
的 batch 版本后,會先對批量消息進行校驗
批量消息不允許延時、不允許發送到重試 Topic,且要求發送到的 Topic 必須是同一個 Topic
List<Message> messageList = new ArrayList<Message>(messages.size());
Message first = null;
for (Message message : messages) {
if (message.getDelayTimeLevel() > 0) {
throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
}
if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
throw new UnsupportedOperationException("Retry Group is not supported for batching");
}
if (first == null) {
first = message;
} else {
if (!first.getTopic().equals(message.getTopic())) {
throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
}
if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
}
}
messageList.add(message);
}
MessageBatch messageBatch = new MessageBatch(messageList);
在校驗完成,且都放到一個 List 之后,接下來的步驟和普通的消息發送都差不多,只是在編碼上理所當然的存在着不同
public static byte[] encodeMessages(List<Message> messages) {
//TO DO refactor, accumulate in one buffer, avoid copies
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
int allSize = 0;
for (Message message : messages) {
// 編碼每一個消息
byte[] tmp = encodeMessage(message);
encodedMessages.add(tmp);
allSize += tmp.length;
}
// 放到最后的大集合中
byte[] allBytes = new byte[allSize];
int pos = 0;
for (byte[] bytes : encodedMessages) {
System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
pos += bytes.length;
}
return allBytes;
}
然后使用 RequestCode.SEND_BATCH_MESSAGE
這個狀態碼發送出去。
在 Broker 端,其投入的過程大體上和普通消息類似,但是其最后的持久化到硬盤時,這塊批量消息被拆分為了普通的單條消息。
即 RocketMQ 使用批量消息只減少了發送時的寬帶傳輸,對於存儲與交給消費者的部分並沒有獲得優化
// 拆分批量消息為每一個普通消息
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
final int msgPos = messagesByteBuff.position();
final int msgLen = messagesByteBuff.getInt();
final int bodyLen = msgLen - 40; //only for log, just estimate it
/* pass: 當作普通消息存儲 */
queueOffset++;
msgNum++;
messagesByteBuff.position(msgPos + msgLen);
}
延時消息
概念
在業務中,有時候有一些延時提交任務的需求,這時候就可以使用延時消息,即在投遞一部分時間后才對消費者可見。
不過,在 RocketMQ 中,延遲級別並不支持自定義,而是具有固定的延遲級別。
不過商業版的 阿里雲MQ 可以支持秒精度的自定義延遲時間,果然是為了閹割社區版來賺錢嗎
源碼流程
RocketMQ 對於延時消息的處理主要在於 Broker 端,所以我們只需要看在 Broker 對延時級別的處理。
首先,在 CommitLog
的 put 中,會對延遲級別進行判斷,如果存在,會在這進行進行 Topic 的替換,將其存儲到對應的延遲級別的 Queue
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
然后會被在 DefaultMessageStore
中初始化的 ScheduleMessageService
處理
首先,該服務在啟動時會進行初始化
public void start() {
// 保證只被執行一次
if (started.compareAndSet(false, true)) {
// 加載本地快照
super.load();
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
// 取出每一個級別
Integer level = entry.getKey();
// 當前延遲級別對應的延遲時間
Long timeDelay = entry.getValue();
// 該延遲級別之前消費到的自己的隊列的偏移量
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
// 每一個延遲級別設置一個定時任務
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 定時持久化各個延遲級別的偏移量
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
每一個延遲級別的 Queue 都有對應的定時任務,且都會執行以下方法
public void executeOnTimeup() {
// 找到自己延遲級別的消費隊列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
// 根據消費偏移量將指定的 MappedFile 文件加載進來
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
// 遍歷每一個消息的索引
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
/* pass */
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) /* 目標時間小於當起時間,可以執行 */ {
// 根據偏移量取出消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
// 將延遲消息恢復成原本消息的樣子
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
/* pass */
// 投入真實的 Topic
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
/* pass: 更新度量信息 */
} catch (Exception e) {
/* pass */
}
}
} else /* 否則,這個消息需要被消費的時間到了再通知我 */ {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
// 更新消費偏移量
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
// 走到這里,說明暫時沒有需要消費的延時消息
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 小睡一會
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
/* pass */
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
可以看出,延遲消息的實現還是十分簡單的,由於先投入的延時消息必先快於后投入的消息的到期,所以只需要不斷的拉取各個延遲級別對應的隊列 的頭部的延遲消息即可。這也是只支持固定級別的延遲消息帶來的好處。