RocketMQ源碼詳解 | Broker篇 · 其四:事務消息、批量消息、延遲消息


概述

在上文中,我們討論了消費者對於消息拉取的實現,對於 RocketMQ 這個黑盒的心臟部分,我們順着消息的發送流程已經將其剖析了大半部分。本章我們不妨乘勝追擊,接着討論各種不同的消息的原理與實現。



事務消息

概念

RocketMQ 中的事務消息功能,實際上是 分布式事務中的本地事務表 的實現,只不過,在這里用消息中間件來代替了數據庫,同時也幫我們做好了回查的操作。

在這點上,RocketMQ 和 Kafka 是截然不同的,kafka 的事務是用來實現 Exacltly Once 語義,且該語義主要用來流計算中,即在 "從 Topic 中讀 -> 計算 -> 存到 Topic" 保證不被重復計算。


事務流程
  1. 客戶端發送 half 消息

吐槽一下為什么要叫半消息(half message),叫 prepare 消息不是更直觀嗎

  1. Broker 將 half 消息持久化
  2. 客戶端根據事務執行結果,發送 Commit / Rollback 消息
  3. Broker 收到 Commit 時,將事務消息對消費者可見。收到 Rollback 時,將消息丟棄
補償
  1. Broker 過久未收到事務執行結果,詢問客戶端執行結果
  2. 客戶端收到結果查詢請求,執行回查方法,發送 Commit / Rollback 方法
  3. Broker 根據事務執行結果做出對應處理


源碼流程

第一步

在設置好了事務監聽器后(執行事務 與 事務回查),就可以發送事務消息

在將事務消息交給發送方法后,客戶端首先會為消息添加事務消息的標識

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");

然后將該事務消息會像普通的同步消息一樣發送(且是同步發送)

sendResult = this.send(msg);

具體發送流程見:RocketMQ源碼詳解 | Producer篇 · 其一:Start,然后 Send 一條消息


第二步

在 Broker 端接收到消息以后,會走與普通消息相同的底層通道(因為這個消息本身就只是個加上了 事務flag 的普通消息),然后由 TransactionalMessageService 來對這個消息進行額外處理。

首先會對該消息放入 real topic 屬性和 real queue 屬性,然后將消息 Topic 替換為用於處理所有事務消息的特殊的 Topic,當然該 Topic 對消費者是不可見的。

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
  MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
  MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
                              String.valueOf(msgInner.getQueueId()));
  // 設置標記為未收到結果
  msgInner.setSysFlag(
    MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
  // 替換到特殊的 Topic (RMQ_SYS_TRANS_HALF_TOPIC)
  msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
  msgInner.setQueueId(0);
  msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
  return msgInner;
}

完成后,會送到 MessageStore 像普通消息一樣處理

普通消息的具體流程見 RocketMQ源碼詳解 | Broker篇 · 其二:文件系統


第三步

回到 Producer 端,在事務消息發送完成后,該方法會使用專門的線程池執行事務

// 2.執行本地事務,更新事務獲取狀態
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);

然后對本地的事務執行狀態進行處理,也就是將該執行狀態上報

this.endTransaction(msg, sendResult, localTransactionState, localException);

這里會發送一條 oneway 命令給 Broker 端,且使用的是 RequestCode.END_TRANSACTION 請求碼

// 事務結果報告(可能是 commit 或 rollback)
public static final int END_TRANSACTION = 37;

完成處理后,該方法會將事務的發送結果和本地事務的執行結構都返回給上層 API


第四步

在 Broker 端,這里會由 EndTransactionProcessor 處理器來處理該請求碼

然后,根據事務的執行結果來做不同的處理

if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
  // 事務執行成功,嘗試完成事務

  // 獲取 half 消息
  result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
  if (result.getResponseCode() == ResponseCode.SUCCESS) {
    if (res.getCode() == ResponseCode.SUCCESS) {
      // 將 half 消息取出,構造真實消息,然后投入實際上的 Topic
      /* pass */      
      RemotingCommand sendResult = sendFinalMessage(msgInner);
      
      if (sendResult.getCode() == ResponseCode.SUCCESS) {
        /*
         * 找到半消息,進行刪除
         * 刪除並不是物理上的刪除,因為物理上的刪除的代價十分的高昂,而是寫入一條具有相同事務id的消息到 op Topic
         */
        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
      }
      return sendResult;
    }
    return res;
  }
}

如果需要回滾,則對相應的半消息進行刪除,且和上面一樣,並不是物理上的刪除,而是發送具有相同事務 id 的消息到 OP Topic,來標記這個事務已經完成了(Commit/Rollback), OP Topic 也是一個特殊的 Topic,同樣對消費者不可見。

if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
  // 事務執行失敗,進行 half 消息的回滾

  // 首先找到 half 消息
  result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
  if (result.getResponseCode() == ResponseCode.SUCCESS) {
    RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
    if (res.getCode() == ResponseCode.SUCCESS) {
      // 進行刪除
      this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
    }
    return res;
  }
}

當這些都做完后,一次事務就完成了。


補償

當然啦,以上是順利的情況,我們當然不能指望事務每一次都能執行成功、網絡分區和宕機事件永遠不會發生。

在一段時間后,如果客戶端沒有對事務的狀態進行上報(或者上報的狀態不是 Commit 或 Rollback,而是 Unknown), Broker 端當然就要進行事務狀態的回查。

BrokerController 啟動的時候,會開啟事務狀態檢測服務,該服務會通過循環調用 TransactionalMessageServiceImpl.check() 方法,不斷的掃描未結束的事務,同時對超過指定時間還不知道狀態的事務進行回查操作。


check() 方法是事務回查的核心,由於很長,我們先來看第一部分(刪減了沒人在意的 Log)

// 首先找到存儲所有 half 消息的 Topic
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
// 對其中每一個 queue 進行檢查
for (MessageQueue messageQueue : msgQueues) {
  long startTime = System.currentTimeMillis();

  // 獲得對應的 op 消息所在的 queue
  MessageQueue opQueue = getOpQueue(messageQueue);
  // 獲取未處理的 half 消息的起始偏移量
  long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
  // 獲取 op 消息的 queue 的起始偏移量
  long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

  // 用來記錄已經被處理了的 op 消息的偏移量
  List<Long> doneOpOffset = new ArrayList<>();
  // 用來記錄已經完成了的 half 消息的偏移量
  // key: halfOffset, value: opOffset
  HashMap<Long, Long> removeMap = new HashMap<>();

  PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);

fillOpRemoveMap 方法中,主要是將 op 消息取出,來標記可以被移除的 half 消息(op 消息的存在代表對應事務的結束

/**
 * 讀取op消息,解析op消息,填充removeMap
 *
 * @param removeMap 要刪除的半消息,key: halfOffset,value: opOffset
 * @param opQueue Op message queue.
 * @param pullOffsetOfOp op message queue 的起始偏移量
 * @param miniOffset half message queue 的當前最小偏移量
 * @param doneOpOffset 存儲已處理的 op 消息
 * @return 獲取到的 Op 消息
 */
private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
                                   MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
  // 首先通過 queue 獲取 op 消息,最大數量為 32 條
  PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
  
  /* pass: pullResult 消息的意外狀態的處理 */

  List<MessageExt> opMsg = pullResult.getMsgFoundList();
  for (MessageExt opMessageExt : opMsg) {
    // op 消息的 body 存儲的是對應的 half 消息的偏移量, 現在將其取出
    Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
    // 感覺這里的 Tag 並沒有什么意義,無論是 Commit 還是 Rollback 都會加入這個 Tag
    if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
      // 在 已處理偏移量 之前的話則可直接放入 已處理偏移量集合
      if (queueOffset < miniOffset) {
        doneOpOffset.add(opMessageExt.getQueueOffset());
      } else {
        // 否則放入需要移除的 half 的消息的集合
        removeMap.put(queueOffset, opMessageExt.getQueueOffset());
      }
    }
  }
  return pullResult;
}

然后進入到 check 方法的第二部分

while (true) {
  if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) break;
  
  // 推進最小已處理偏移量
  if (removeMap.containsKey(i)) /* 如果該 half 消息存在對應的 op 消息,說明已經被處理了(commit/rollback) */ {
    // 取出放入到已處理偏移量隊列
    Long removedOpOffset = removeMap.remove(i);
    doneOpOffset.add(removedOpOffset);

  } else /* 否則說明當前 half 消息懸而未決  */ {
    // 取出對應的半消息
    GetResult getResult = getHalfMsg(messageQueue, i);
		
    /* pass: 半消息不存在時的意外處理 */

    /*
     * 檢測是否要丟棄或跳過
     *   丟棄條件: 當前事務已經超過了最大回查次數(15次)
     *   跳過條件: 已經超過了過期文件最大保留時間(72小時)
     */
    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
      // 處理並推進偏移量
      // 具體的處理方法是: 投入 TRANS_CHECK_MAX_TIME_TOPIC 這個 Topic,等待手動處理
      listener.resolveDiscardMsg(msgExt);
      
      // 進入到下一個 half 消息
      newOffset = i + 1;
      i++;
      continue;
    }
    if (msgExt.getStoreTimestamp() >= startTime) {
      break;
    }

上面的方法很好理解,只是對於已經被標記結束的事務的處理、和未結束事務的補足


接下來是第三部分,這里將繼續對未結束事務的補足,與進行可能的回查操作

  // half 消息具有最小的檢查時間(免疫時間), 檢測時間以內可以跳過回查, 重新投入 half 消息的 Topic
  long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
  long checkImmunityTime = transactionTimeout;
  String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
  if (null != checkImmunityTimeStr) {
    checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
    if (valueOfCurrentMinusBorn < checkImmunityTime) {
      if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
        newOffset = i + 1;
        i++;
        continue;
      }
    }
  } else {
    if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
      break;
    }
  }


  /*
   * 對於當前事務的回查操作,需要滿足三個條件之一
   *  1.當前 op 消息的集合為空,且已經超過了最小檢查時間(免疫時間)
   *  2.最大偏移量的 op 消息的生成時間 已經超過了 最小檢查時間
   *  3.關閉最小檢查時間
   */
  List<MessageExt> opMsg = pullResult.getMsgFoundList();
  boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
    || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
    || (valueOfCurrentMinusBorn <= -1);

  if (isNeedCheck) {
    // 先將當前 half 消息放回
    if (!putBackHalfMsgQueue(msgExt, i)) {
      continue;
    }
    // 然后向 Product 發送檢測消息
    listener.resolveHalfMsg(msgExt);
  } else {
    // 否則更新 op 消息集合,以確保能夠斷言該 half 消息的狀態
    pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
    continue;
  }
}
 newOffset = i + 1;
 i++;
}

上面這段代碼主要圍繞 "是否進行回查" 展開,且涉及到 "免疫時間"。

在一個事務消息被發送后,對應事務的執行當然需要一定的執行時間,如果我們不設置這個時間立刻進行回查,那么很有可能時候事務還沒執行完,對於大多數情況下還沒執行完的事務進行回查,毫無疑問帶來的收益很低。所以我們需要設定一個時間,在這個時間內的事務先暫時不回查,這個時間就叫做"免疫時間"。

然后再來看下需要進行回查的三種情況:

  1. 當 op 消息的集合為空,說明當前還沒有收到讓當前事務結束的通知,且超過了"免疫時間",故回查
  2. 當前 op 消息最大偏移量的生成時間超過了"免疫時間",說明該事務的提交消息可能丟失了,故回查
  3. 不啟用 "免疫時間"

其中發送的回查消息的請求碼為 RequestCode.CHECK_TRANSACTION_STATE ,發送的也是 oneway 消息


最后的第四部分,同時更新 half 和 op 消息在 Queue 中的偏移量

// 對所有的 half 消息計算完成后,更新偏移量
if (newOffset != halfOffset) {
  transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
// 根據已經被標記為完成的 op 消息更新偏移量
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
  // 如果不等,說明並不是所有的 op 消息都被標記為完成了
  // 所以我們只將偏移量更新到第一個未完成的 op 消息的位置,其后面的 op 消息會在下次重復處理
  transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}


然后在 Producer 這邊,將由 ClientRemotingProcessor.checkTransactionState() 來處理回查操作

// 獲取事務 ID
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
    messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
    // 從 MQClientFactory 找到注冊的對應 Producer
    MQProducerInner producer = this.mqClientFactory.selectProducer(group);
    if (producer != null) {
        final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        // 讓 Producer 檢查在對應 IP 上的事務狀態
        producer.checkTransactionState(addr, messageExt, requestHeader);
    } else {
        log.debug("checkTransactionState, pick producer by group[{}] failed", group);
    }
} else {
    log.warn("checkTransactionState, pick producer group failed");
}

再進入 producer.checkTransactionState() 看看 Producer 是怎樣檢查事務狀態的

TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
// 取出當前 Producer 的事務監聽器
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
  LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
  Throwable exception = null;
  try {
    if (transactionCheckListener != null) {
      // 調用其的事務回查方法
      localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
    } else if (transactionListener != null) {
      log.debug("Used new check API in transaction message");
      localTransactionState = transactionListener.checkLocalTransaction(message);
    }
  } catch (Throwable e) {
    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
    exception = e;
  }

  // 再將事務執行結果其發回給 Broker
  this.processTransactionState(
    localTransactionState,
    group,
    exception);
} else {
  log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}

最后發回的方法做的事情和在一開始發送事務狀態的方法,所做的事情是一樣的。Broker 做的處理也是一樣的。

這樣,補償流程就執行完了。



批量消息

概念

在消息隊列中,批量消息也是一個重要的部分,將消息壓縮在一起發送不僅可以減少帶寬的消耗,還能節省頭部占用的空間。

有點失望的是,RocketMQ 對於批量消息的實現有點"粗糙"了


源碼流程

首先,在調用 send() 的 batch 版本后,會先對批量消息進行校驗

批量消息不允許延時、不允許發送到重試 Topic,且要求發送到的 Topic 必須是同一個 Topic

List<Message> messageList = new ArrayList<Message>(messages.size());
Message first = null;
for (Message message : messages) {
  if (message.getDelayTimeLevel() > 0) {
    throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
  }
  if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    throw new UnsupportedOperationException("Retry Group is not supported for batching");
  }
  if (first == null) {
    first = message;
  } else {
    if (!first.getTopic().equals(message.getTopic())) {
      throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
    }
    if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
      throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
    }
  }
  messageList.add(message);
}
MessageBatch messageBatch = new MessageBatch(messageList);

在校驗完成,且都放到一個 List 之后,接下來的步驟和普通的消息發送都差不多,只是在編碼上理所當然的存在着不同

public static byte[] encodeMessages(List<Message> messages) {
  //TO DO refactor, accumulate in one buffer, avoid copies
  List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
  int allSize = 0;
  for (Message message : messages) {
    // 編碼每一個消息
    byte[] tmp = encodeMessage(message);
    encodedMessages.add(tmp);
    allSize += tmp.length;
  }

  // 放到最后的大集合中
  byte[] allBytes = new byte[allSize];
  int pos = 0;
  for (byte[] bytes : encodedMessages) {
    System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
    pos += bytes.length;
  }
  return allBytes;
}

然后使用 RequestCode.SEND_BATCH_MESSAGE 這個狀態碼發送出去。


在 Broker 端,其投入的過程大體上和普通消息類似,但是其最后的持久化到硬盤時,這塊批量消息被拆分為了普通的單條消息。

即 RocketMQ 使用批量消息只減少了發送時的寬帶傳輸,對於存儲與交給消費者的部分並沒有獲得優化

// 拆分批量消息為每一個普通消息
while (messagesByteBuff.hasRemaining()) {
  // 1 TOTALSIZE
  final int msgPos = messagesByteBuff.position();
  final int msgLen = messagesByteBuff.getInt();
  final int bodyLen = msgLen - 40; //only for log, just estimate it
  
  /*  pass: 當作普通消息存儲   */
  
  queueOffset++;
  msgNum++;
  messagesByteBuff.position(msgPos + msgLen);
}


延時消息

概念

在業務中,有時候有一些延時提交任務的需求,這時候就可以使用延時消息,即在投遞一部分時間后才對消費者可見。

不過,在 RocketMQ 中,延遲級別並不支持自定義,而是具有固定的延遲級別。

不過商業版的 阿里雲MQ 可以支持秒精度的自定義延遲時間,果然是為了閹割社區版來賺錢嗎


源碼流程

RocketMQ 對於延時消息的處理主要在於 Broker 端,所以我們只需要看在 Broker 對延時級別的處理。

首先,在 CommitLog 的 put 中,會對延遲級別進行判斷,如果存在,會在這進行進行 Topic 的替換,將其存儲到對應的延遲級別的 Queue

if (msg.getDelayTimeLevel() > 0) {
  if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
  }

  topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
  queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

  // Backup real topic, queueId
  MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
  MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
  msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

  msg.setTopic(topic);
  msg.setQueueId(queueId);
}

然后會被在 DefaultMessageStore 中初始化的 ScheduleMessageService 處理

首先,該服務在啟動時會進行初始化

public void start() {
  // 保證只被執行一次
  if (started.compareAndSet(false, true)) {
    // 加載本地快照
    super.load();
    this.timer = new Timer("ScheduleMessageTimerThread", true);
    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
      // 取出每一個級別
      Integer level = entry.getKey();
      // 當前延遲級別對應的延遲時間
      Long timeDelay = entry.getValue();
      // 該延遲級別之前消費到的自己的隊列的偏移量
      Long offset = this.offsetTable.get(level);
      if (null == offset) {
        offset = 0L;
      }

      // 每一個延遲級別設置一個定時任務
      if (timeDelay != null) {
        this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
      }
    }

    // 定時持久化各個延遲級別的偏移量
    this.timer.scheduleAtFixedRate(new TimerTask() {

      @Override
      public void run() {
        try {
          if (started.get()) ScheduleMessageService.this.persist();
        } catch (Throwable e) {
          log.error("scheduleAtFixedRate flush exception", e);
        }
      }
    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
  }
}

每一個延遲級別的 Queue 都有對應的定時任務,且都會執行以下方法

public void executeOnTimeup() {
  // 找到自己延遲級別的消費隊列
  ConsumeQueue cq =
    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                                                                     delayLevel2QueueId(delayLevel));
  long failScheduleOffset = offset;
  if (cq != null) {
    // 根據消費偏移量將指定的 MappedFile 文件加載進來
    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
    if (bufferCQ != null) {
      try {
        long nextOffset = offset;
        int i = 0;
        // 遍歷每一個消息的索引
        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
          long offsetPy = bufferCQ.getByteBuffer().getLong();
          int sizePy = bufferCQ.getByteBuffer().getInt();
          long tagsCode = bufferCQ.getByteBuffer().getLong();

          /* pass  */

          long now = System.currentTimeMillis();
          long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

          nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

          long countdown = deliverTimestamp - now;
          if (countdown <= 0) /* 目標時間小於當起時間,可以執行 */ {
            // 根據偏移量取出消息
            MessageExt msgExt =
              ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
              offsetPy, sizePy);

            if (msgExt != null) {
              try {
                // 將延遲消息恢復成原本消息的樣子
                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                /* pass */
                
                // 投入真實的 Topic
                PutMessageResult putMessageResult =
                  ScheduleMessageService.this.writeMessageStore
                  .putMessage(msgInner);

                /* pass: 更新度量信息  */
              } catch (Exception e) {
                /* pass */
              }
            }
          } else /* 否則,這個消息需要被消費的時間到了再通知我 */ {
            ScheduleMessageService.this.timer.schedule(
              new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
              countdown);
            // 更新消費偏移量
            ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
            return;
          }
        } // end of for

        // 走到這里,說明暫時沒有需要消費的延時消息
        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
        // 小睡一會
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
          this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
        return;
      } finally {
        bufferCQ.release();
      }
    } // end of if (bufferCQ != null)
    /* pass */
  } // end of if (cq != null)
  ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                                                failScheduleOffset), DELAY_FOR_A_WHILE);
}

可以看出,延遲消息的實現還是十分簡單的,由於先投入的延時消息必先快於后投入的消息的到期,所以只需要不斷的拉取各個延遲級別對應的隊列 的頭部的延遲消息即可。這也是只支持固定級別的延遲消息帶來的好處。



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM