RocketMQ源碼詳解 | Consumer篇 · 其一:消息的 Pull 和 Push



概述

當消息被存儲后,消費者就會將其消費。

這句話簡要的概述了一條消息的最總去向,也引出了本文將討論的問題:

  • 消息什么時候才對被消費者可見?

    是在 page cache 中嗎?還是在落盤后?還是像 Kafka 一樣維護了一個 ISR 隊列,等到副本都將消息也落盤后才可見?

  • 消息如何到達消費者手里?

    是由 Broker push 過去嗎?還是由消費者自己 pull?

  • 怎樣知道消息消費到哪里?進度由誰管理?是可靠的嗎?

  • ...


本文接下來將從消費者的客戶端開始介紹,逐步回答以上問題



Pull

Client

RocketMQ 中,具有三個客戶端類:

  • DefaultMQPullConsumer(deprecated)
  • DefaultLitePullConsumer
  • DefaultMQPushConsumer

其中,前兩個為 Pull 類型,即由自己去拉取消息;后面一個是 Push 類型,即只需要設置好回調方法等,然后等待消息到來后進行調用該方法即可。

但實際上,他們的底層實現都是 pull,即都是由客戶端去 Broker 獲取消息。

這是因為,使用 Push 需要額外考慮一些問題,如消費者的消費速率慢於 Broker 的發送速率時會導致消費者的緩沖區滿,即使可以通過設置背壓(BackPressure)機制來做流量控制,但這樣毫無疑問會增加程序的復雜度。但如果是消費者按需拉取的話,則設計方法會簡便很多,也可以將消費者緩沖區滿的問題轉化為 Broker 消息堆積的問題。



我們首先來看第二個客戶端類 DefaultLitePullConsumer(第一個已經被 RocketMQ 標記為 deprecated,所以不進行介紹)

該 Pull 客戶端有兩種使用方式,第一種是 Assign,即由我們自己分配要拉取的 queue

// 獲取 Topic 的所有 Queue
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
List<MessageQueue> list = new ArrayList<>(mqSet);
// 將要訂閱的 queue 加入到 List
List<MessageQueue> assignList = new ArrayList<>();
for (int i = 0; i < list.size() / 2; i++) {
  assignList.add(list.get(i));
}
// 手動為這個消費者分配一個消息隊列列表
litePullConsumer.assign(assignList);
/*
 * 覆蓋消費者對於一個 queue 的消費偏移量
 * 如果針對同一個消息隊列多次調用此 API,則在下一次 poll() 中將使用最新的偏移量
 * 請注意,如果在消費過程中隨意使用該API,可能會丟失數據
 */
litePullConsumer.seek(assignList.get(0), 10);

第二種是訂閱,我們只需要注冊需要拉取的 Topic,對於從哪個 queue 拉取,我們並不關心,由系統自動分配

// 選擇從 queue 頭部開始 pull 還是從尾部開始 pull
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTestA", "*");

以下為兩種操作的內部實現

public synchronized void assign(Collection<MessageQueue> messageQueues) {
  setSubscriptionType(SubscriptionType.ASSIGN);
  assignedMessageQueue.updateAssignedMessageQueue(messageQueues);
  if (serviceState == ServiceState.RUNNING) {
    updateAssignPullTask(messageQueues);
  }
}
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
    setSubscriptionType(SubscriptionType.SUBSCRIBE);
    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
    this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
    assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
    if (serviceState == ServiceState.RUNNING) {
      this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
      updateTopicSubscribeInfoWhenSubscriptionChanged();
    }
}

對於消息隊列的內部的分配類 AssignedMessageQueue ,Assign 會直接設置要訂閱的 queue,Subscribe 會通過設置 rebalance 服務來自動的更新訂閱的 queue。

由此可見,兩種模式的區別主要在於 rebalance 服務是否啟動。



DefaultLitePullConsumer 客戶端有以下重要的屬性

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {

  /**
   * pull 客戶端的實現類,幾乎全數操作都交由該類執行
   */
  private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;

  /**
   * 屬於同一個消費者組的消費者共享一個組 ID。然后,組中的消費者通過建立每個隊列
   * 僅由組中的單個消費者使用來盡可能公平地划分主題。如果所有的消費者都來自同一個組,
   * 它的作用就像一個傳統的消息隊列。每條消息只能由該組的一個消費者使用。
   * 當存在多個消費群體時,數據消費模型的流程與傳統的發布訂閱模型一致。
   * 消息被廣播到所有消費者組。
   */
  private String consumerGroup;

  /**
   * 長輪詢模式中,消費者連接最大掛起時間。
   * 不推薦修改
   */
  private long brokerSuspendMaxTimeMillis = 1000 * 20;

  /**
   * 長輪詢模式中,消費者最長的超時時間
   * 不推薦修改
   */
  private long consumerTimeoutMillisWhenSuspend = 1000 * 30;

  /**
   * socket 超時時間
   */
  private long consumerPullTimeoutMillis = 1000 * 10;

  /**
   * 消費模式,默認為集群
   */
  private MessageModel messageModel = MessageModel.CLUSTERING;
  /**
   * 當前 Consumer 的消息監聽器
   */
  private MessageQueueListener messageQueueListener;
  /**
   * 持久化維護的偏移量
   */
  private OffsetStore offsetStore;

  /**
   * 隊列分配策略
   *  - AllocateMachineRoomNearby,根據機房進行分配
   *  - AllocateMessageQueueAveragely,均分哈希策略
   *  - AllocateMessageQueueAveragelyByCircle,環形哈希策略
   *  - AllocateMessageQueueByConfig,根據配置進行分配
   *  - AllocateMessageQueueByMachineRoom,機房哈希
   *  - AllocateMessageQueueConsistentHash,一致性哈希
   *
   *  默認為均分策略
   */
  private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();

  /**
   * 是否開啟自動提交偏移量
   */
  private boolean autoCommit = true;

  /**
   * pull 的線程數量
   */
  private int pullThreadNums = 20;

  /**
   * 最小自動提交的時間間隔(毫秒)
   */
  private static final long MIN_AUTOCOMMIT_INTERVAL_MILLIS = 1000;

  /**
   * 最大自動提交的時間間隔(毫秒)
   */
  private long autoCommitIntervalMillis = 5 * 1000;

  /**
   * 每次拉取的最大消息數
   */
  private int pullBatchSize = 10;

  /**
   * 消費請求的流量控制閾值,每個消費者默認最多緩存 10000 個消費請求。
   * 緩存是指:已經拉取到了 Consumer, 但還沒被消息完成的消息
   * 考慮到 {@code pullBatchSize},瞬時值可能會超過這個限制
   */
  private long pullThresholdForAll = 10000;

  /**
   * 消費的最大偏移跨度
   */
  private int consumeMaxSpan = 2000;

  /**
   * Queue 級別的流量控制閾值,每個 queue 默認最多緩存 1000 條消息
   * 考慮到 {@code pullBatchSize},瞬時值可能會超過限制
   */
  private int pullThresholdForQueue = 1000;

  /**
   * 在隊列級別限制緩存的消息大小,每個 queue 默認最多緩存 100 MiB 消息
   * 考慮到 {@code pullBatchSize},瞬時值可能會超過限制
   *
   * 消息的大小僅包括消息的 body ,因此不准確
   */
  private int pullThresholdSizeForQueue = 100;

  /**
   * poll 的默認超時時間(毫秒)
   */
  private long pollTimeoutMillis = 1000 * 5;

  /**
   * 檢查 Topic 元數據更改的時間間隔
   */
  private long topicMetadataCheckIntervalMillis = 30 * 1000;

  /**
   * 剛進入 queue 時的消費位置,默認從尾部進行消費
   */
  private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

  /**
   * 消費回溯時間(?)(秒精度).
   * 默認為半小時前
   */
  private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
}

啟動

DefaultLitePullConsumer 在 Start 時,會先設置消費者組,然后調用剛剛說過的"內部實現類"的 start() 方法

以下是實際的啟動方法:

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultLitePullConsumer.changeInstanceNameToPID();
            }

            // 向 MQClintFactory 注冊自己的消費者組
            initMQClientFactory();

            // 初始化再均衡服務
            initRebalanceImpl();

            initPullAPIWrapper();

            // 初始化內部偏移量持久化服務
            initOffsetStore();

            mQClientFactory.start();

            // 開啟調度服務
            startScheduleTask();

            this.serviceState = ServiceState.RUNNING;

            log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());

            // 啟動后置處理
            operateAfterRunning();

            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
}
  1. 向全局的 MQClientFactroy 實例注冊自己的消費者組和實例

  2. 初始化 RebalanceImpl。向其注冊 消費者組、消費模式(集群或廣播)、隊列分配策略

  3. 創建消息 pull 執行類

  4. 初始化並加載消費偏移量持久化類

    • 廣播模式下使用 LocalFileOffsetStore 類,其會從本地文件加載存儲 queue 的消費偏移量

    • 集群模式下使用 RemoteBrokerOffsetStore 類,其會從 Broker 獲取 queue 的消費偏移量

  5. 啟動可能未啟動的 MQClientFactory

  6. 開啟調度服務

private void startScheduleTask() {
  // 監聽 Topic 下的 Queue 發生變化
  scheduledExecutorService.scheduleAtFixedRate(
    new Runnable() {
      @Override
      public void run() {
        try {
          fetchTopicMessageQueuesAndCompare();
        } catch (Exception e) {
          log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
        }
      }
    }, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);
}
  1. 將狀態標記為已啟動
  2. 進行后置操作
private void operateAfterRunning() throws MQClientException {
  // 如果在啟動之前就已經進行了 subscribe,則在初始化后更新 Topic 訂閱信息
  if (subscriptionType == SubscriptionType.SUBSCRIBE) {
    updateTopicSubscribeInfoWhenSubscriptionChanged();
  }
  // 如果在啟動之前就已經進行了 assign ,則在初始化后更新 pull 任務。
  if (subscriptionType == SubscriptionType.ASSIGN) {
    updateAssignPullTask(assignedMessageQueue.messageQueues());
  }

  // 獲取所有注冊了 queue change 監聽器的 Topic
  // 然后獲取 Topic 的 Queue, 並添加到本地
  for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
    Set<MessageQueue> messageQueues = fetchMessageQueues(topic);
    messageQueuesForTopic.put(topic, messageQueues);
  }

  // 獲取並存儲所有需要訂閱的 Topic 所在的所有 Broker 的地址
  this.mQClientFactory.checkClientInBroker();
}

然后,該 Consumer 就初始化完成開始工作了


Poll

DefaultLitePullConsumerImpl#poll 方法的主要邏輯是這樣的:

public synchronized List<MessageExt> poll(long timeout) {
	// 自動提交選擇
  if (defaultLitePullConsumer.isAutoCommit()) {
    maybeAutoCommit();
  }

  // 等待消費請求
  ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

  // 更新內存中的偏移量
  if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
    List<MessageExt> messages = consumeRequest.getMessageExts();
    long offset = consumeRequest.getProcessQueue().removeMessage(messages);
    assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
    return messages;
  }
  return Collections.emptyList();
}

可以看出,這個方法主要是等待 consumeRequestCache 這個同步阻塞隊列被放入消息。那么是在哪里被放入的呢?


對於 Assign 模式,在上面講到的消費者的啟動中,第 8 步的后置操作的 updateAssignPullTask 方法中,會執行 startPullTask 方法。它為所有被分配的 Queue 都添加一個 Pull 任務

private void startPullTask(Collection<MessageQueue> mqSet) {
  for (MessageQueue messageQueue : mqSet) {
    if (!this.taskTable.containsKey(messageQueue)) {
      PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
      this.taskTable.put(messageQueue, pullTask);
      this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
    }
  }
}

對於 Subscribe 模式,他會由之前注冊的默認的 MessageQueueListener 來管理,其通過監聽 Queue 的變化來更新 Pull 任務

class MessageQueueListenerImpl implements MessageQueueListener {
    @Override
    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
        switch (messageModel) {
            case BROADCASTING:
                updateAssignedMessageQueue(topic, mqAll);
                updatePullTask(topic, mqAll);
                break;
            case CLUSTERING:
                updateAssignedMessageQueue(topic, mqDivided);
                updatePullTask(topic, mqDivided);
                break;
            default:
                break;
        }
    }
}

updatePullTask 方法中,最后也是調用 startPullTask 來啟動任務。


所以最后,poll 實現的重點位置就在於 PullTaskImpl

而這塊的代碼比較長,所以我們跳過上半部的代碼,只簡單概括下:


上半部主要是對當前 Queue 的狀態進行多個判斷,當可能發生風險的時候直接添加下一次的延時任務,跳過當前狀況。

這里的需要自己添加任務的原因是:該任務調度線程池並不是一個定時觸發的線程池,而是一個延時任務線程池,所以需要在一次任務執行完成后再繼續添加新的任務

會發生跳過的狀況如下:

  1. Pull 任務已經取消
  2. 隊列長度 * 批量消息最大長度 > 最大緩存請求數量,說明可能超過閾值
  3. queue 的消息緩存數量超過限制
  4. queue 的消息緩存大小超過限制
  5. queue 的消息的最大時間與最小時間的差值,超過了指定了允許的差值
  6. 獲取偏移量失敗

當將所有的情況都判斷完成后,就開始執行 pull 和 將返回的消息包裝成 ConsumeRequest

long pullDelayTimeMills = 0;
try {
  // 創建訂閱信息
  SubscriptionData subscriptionData;
  String topic = this.messageQueue.getTopic();
  if (subscriptionType == SubscriptionType.SUBSCRIBE) {
    subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
  } else {
    subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
  }

  // 根據訂閱信息遠程 pull 消息
  PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());

  if (this.isCancelled() || processQueue.isDropped()) {
    return;
  }
  switch (pullResult.getPullStatus()) {
    case FOUND:
      // 獲取 queue 的鎖
      final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
      synchronized (objLock) {
        if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
          processQueue.putMessage(pullResult.getMsgFoundList());
          // 提交消費請求
          submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
        }
      }
      break;
    case OFFSET_ILLEGAL:
      log.warn("The pull request offset illegal, {}", pullResult.toString());
      break;
    default:
      break;
  }
  // 更新偏移量
  updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
} catch (Throwable e) {
  pullDelayTimeMills = pullTimeDelayMillsWhenException;
  log.error("An error occurred in pull message process.", e);
}

if (!this.isCancelled()) {
  scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
  log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
}

在拉取消息,包裝為 ConsumeRequest 后,就會投入 consumeRequestCache,然后阻塞在 poll 那邊的消息就會被喚醒,然后將其返回給上層應用。這也就回答了我們上面提出的那個問題。


而上面的那塊的 pull() 方法,則會使用 PullAPIWrapper 來構造一個 pull 請求,然后交給 MQClientFactory 進入底層的 Netty 組件發送,具體的底層發送流程與邏輯我們已經在以下幾篇文章中討論過了



Commit

Client 的偏移量的提交實現比較簡單,這里只簡單進行描述。

在 comsumer 調用 commitSync 函數后,會根據當前的消費模式(廣播 or 集群)來做不同的操作

  • 集群

    此時,會將所有的消費隊列的偏移量存儲到另一個偏移量管理器,只是保存到內存。

  • 廣播

    廣播除了會保存到偏移量管理器外,還會像 Broker 的偏移量管理一樣持久化到磁盤。

集群模式下的持久化只會在 shutdown 的時候和 pull 時提交到 Broker



Broker

然后來看 Broker 端,在這邊,Broker 將 Pull 消息的請求碼注冊到了 PullMessageProcessor

該類對於 Pull 請求的處理的主要流程如下:

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
  throws RemotingCommandException {
  /*    pass      */

  // 取出訂閱配置
  SubscriptionGroupConfig subscriptionGroupConfig =
    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
  if (null == subscriptionGroupConfig) {
    response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
    response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
    return response;
  }

  /*    pass      */

  // 通過消費者組、Topic、Queue、offset、最大消息數、消息過濾器,從 MessageStore 取出消息
  final GetMessageResult getMessageResult =
    this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),
                                                       requestHeader.getTopic(),
                                                       requestHeader.getQueueId(),
                                                       requestHeader.getQueueOffset(),
                                                       requestHeader.getMaxMsgNums(),
                                                       messageFilter);

  if (getMessageResult != null) {
    response.setRemark(getMessageResult.getStatus().name());
    responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
    responseHeader.setMinOffset(getMessageResult.getMinOffset());
    responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

    // 設置建議的拉取 Broker 地址
    //  這是因為主 Broker 發生了消息堆積,所以交由從 Broker 去接管讀請求
    if (getMessageResult.isSuggestPullingFromSlave()) {
      responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
    } else {
      responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
    }

    switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
      case ASYNC_MASTER:
      case SYNC_MASTER:
        break;
      case SLAVE:
        if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
          response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
          responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
        }
        break;
    }

    // 允許從 salve broker 讀的話
    if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
      // 消費進度慢,重定向到另一台機器
      if (getMessageResult.isSuggestPullingFromSlave()) {
        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
      }
      // consume ok
      else {
        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
      }
    } else {
      responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
    }
  }

  /*    pass      */

  // 持久化偏移量
  boolean storeOffsetEnable = brokerAllowSuspend;
  storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
  storeOffsetEnable = storeOffsetEnable
    && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
  if (storeOffsetEnable) {
    this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
                                                                  requestHeader.getConsumerGroup(),
                                                                  requestHeader.getTopic(),
                                                                  requestHeader.getQueueId(),
                                                                  requestHeader.getCommitOffset());
  }
  return response;
}

我們主要來看 取出消息 和 持久化偏移量 的實現


取出消息

取出消息的主要流程如下

int i = 0;
// 最大掃描消息大小為 16000 與 最大允許消息數量*20 的最大值
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
  // 先從 ConsumerQueue 獲取需要消費的消息索引
  long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
  int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
  long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

  maxPhyOffsetPulling = offsetPy;

  if (nextPhyFileStartOffset != Long.MIN_VALUE) {
    if (offsetPy < nextPhyFileStartOffset)
      continue;
  }

  boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);

  // 檢查消息的總大小是否到達上限
  if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                          isInDisk)) {
    // 達到上限后即可直接返回
    break;
  }

  /*   pass   */
  
  // 消息過濾
  if (messageFilter != null
      && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
    if (getResult.getBufferTotalSize() == 0) {
      status = GetMessageStatus.NO_MATCHED_MESSAGE;
    }

    continue;
  }

  // 從 CommitLog 獲取消息
  SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);

  if (null == selectResult) {
    if (getResult.getBufferTotalSize() == 0) {
      status = GetMessageStatus.MESSAGE_WAS_REMOVING;
    }

    nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
    continue;
  }

   /*   pass   */

  this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
  // 添加到結果集
  getResult.addMessage(selectResult);
  status = GetMessageStatus.FOUND;
  nextPhyFileStartOffset = Long.MIN_VALUE;
}

 /*   pass   */

// 當消費進度落后物理內存的 40% 時,調換到從庫去處理讀
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                      * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);

其中我們可以發現,我們必須要先從 ConsumerQueue 取出對應的消息,然后才進行拉取消息。

而我們在上一章討論過,ConsumerQueue 由 CommitLogDispatcher 分發后進行維護,即最快可以還在內存中時就可以構建出索引。

這便回答了我們在開頭提出的第一個問題:

  • 如果沒有同步的刷盤策略和副本同步策略的話,我們很快就能對其進行消費(即使消息還在 Page Cache 中)
  • 如果存在以上兩種策略任一的話,則需要先完成設定的策略,然后才能消費


持久化偏移量

然后是持久化偏移量,消費進度的管理由 Broker 內部維護,但在 Consumer 本地也會有進行管理,且以消費者的消費進度為主

這是因為我們在上面看到的一個特性所導致的,即在消費進度落后物理內存 40% 的時候,會交由從庫去讀。

這樣的特性導致了在主 Broker 中維護的偏移量發生了延遲,即使 salve broker 會通過定期上報偏移量的方法來維護,但難免存在落后。


偏移量的維護比較簡單,在 ConsumerOffsetManager 類內部具有一個 並發安全的 Map 來保存

private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
  ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
  if (null == map) {
    map = new ConcurrentHashMap<Integer, Long>(32);
    map.put(queueId, offset);
    this.offsetTable.put(key, map);
  } else {
    Long storeOffset = map.put(queueId, offset);
    if (storeOffset != null && offset < storeOffset) {
      log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
    }
  }
}

而持久化管理則是在其繼承的 ConfigManager 抽象類中實現的

public synchronized void persist() {
  String jsonString = this.encode(true);
  if (jsonString != null) {
    String fileName = this.configFilePath();
    try {
      MixAll.string2File(jsonString, fileName);
    } catch (IOException e) {
      log.error("persist file " + fileName + " exception", e);
    }
  }
}

且為了保證修改時發生宕機后不會錯誤,其是在備份源文件后再進行寫入的

public static void string2File(final String str, final String fileName) throws IOException {

  String tmpFile = fileName + ".tmp";
  string2FileNotSafe(str, tmpFile);

  String bakFile = fileName + ".bak";
  String prevContent = file2String(fileName);
  if (prevContent != null) {
    string2FileNotSafe(prevContent, bakFile);
  }

  File file = new File(fileName);
  file.delete();

  file = new File(tmpFile);
  file.renameTo(new File(fileName));
}

最后持久化完成后,則可以在你的本地的持久化目錄中,中找到一個 json 文件,打開后就是下面這樣

{
  "offsetTable":{
    "%RETRY%my-consumer@my-consumer":{0:0},
    "RMQ_SYS_TRANS_HALF_TOPIC@CID_RMQ_SYS_TRANS":{0:22},
    "RMQ_SYS_TRANS_OP_HALF_TOPIC@CID_RMQ_SYS_TRANS":{0:4},
    "TopicTestA@lite_pull_consumer_test":{0:0}
  }
}

可以看到有我們自己創建的測試 Topic,和系統創建的重試 Topic 與事務有關 Topic




Push

Client

然后進入到第三個客戶端類 DefaultMQPushConsumer

對於這個客戶端類,其與上一個的區別在於,它注冊了一個 MessageListener ,且會在我們在之前看到的 start 中執行並發消費和順序消費的選擇。

// 消息的並行消費和順序消費
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
  this.consumeOrderly = true;
  this.consumeMessageService =
    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
  this.consumeOrderly = false;
  this.consumeMessageService =
    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

並行消費和順序消費的區別只在於,順序消費會對消息所在 Queue 加鎖,等待上一批消息消費完成后,才會消費下一批消息;並行消費則不會加鎖


我們注冊的消息監聽器則會被 MQClintInstance 在初始化時啟動的 PullMessageService 服務所調用。

@Override
public void run() {
  while (!this.isStopped()) {
    try {
      PullRequest pullRequest = this.pullRequestQueue.take();
      this.pullMessage(pullRequest);
    } catch (InterruptedException ignored) {
    } catch (Exception e) {
      log.error("Pull Message Service Run Method exception", e);
    }
  }
}

上面的 pullMessage() 方法就是 Push 實現的重點了(也很長)


該方法首先會根據當前度量標准判斷是否需要延遲拉取。延遲拉取的實現是,設置定時器將新的 PullRequest 在一段時間后重新投入 pullRequestQueue

具體的判斷方法就是我們在 Pull 已經講過的:

  1. Pull 任務已經取消
  2. ...

等狀況

pull 和 push 在這點的邏輯上是相同的


然后開始構建回調函數,該回調函數會在拉取消息后被調用,其主要做的是錯誤處理和提交消費請求

下面這段就是在回調函數中的"提交消費請求"的方法

// 提交消費請求
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
  pullResult.getMsgFoundList(),
  processQueue,
  pullRequest.getMessageQueue(),
  dispatchToConsume);

該方法根據消費是順序的還是並發的來做具體的實現


構建發送消息所需要的環境后,便和 Pull 一樣調用底層 Netty 組件發送

Push 模式構建的請求與 Pull 模式下構建的請求,區別幾乎只在這一段

// Pull 模式下構建的請求
PullSysFlag.buildSysFlag(false, block, true, false);
// Push 模式下構建的請求
PullSysFlag.buildSysFlag(
    commitOffsetEnable, // commitOffset
    true, // suspend
    subExpression != null, // subscription
    classFilter // class filter
);

可以看出,Pull 不會提交偏移量,且可能會 suspend,同時使用過濾表達式但不支持過濾器類模式

且由於發送的請求碼都是一致的,所以我們可以確定,Pull 和 Push 的實質上的區別在於 Push 會使用長輪詢


Broker

由於請求碼一樣,所以在 Broker 端的處理也和上面講過的代碼一樣。

在 Broker 對於 Push 的處理中,如果拉取的消息達到了要求的數量的話,則會直接返回,否則會進入到以下代碼

實際上,正如我們剛剛說的,Broker 並不能區分 Push 請求和 Pull 請求,它只是根據 SysFlags 上是否有 suspend 標識來選擇是否進入以下代碼塊,而 Pull 請求也是可以使用這個標識的

if (brokerAllowSuspend && hasSuspendFlag) {
  long pollingTimeMills = suspendTimeoutMillisLong;
  // 開啟長輪詢則使用,否則為短輪詢
  if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
    pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
  }

  String topic = requestHeader.getTopic();
  long offset = requestHeader.getQueueOffset();
  int queueId = requestHeader.getQueueId();
  // 構建 PullRequest
  PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                                            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
  this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
  response = null;
  break;
}

該塊會構建 PullRequest 然后等待 Broker 的獲取


Broker 的 存儲組件層 會在將 reputed 指針推進時獲取 PullReqeust 並進行處理

reputed 指針的介紹與推進 可以看這篇文章:RocketMQ源碼詳解 | Broker篇 · 其三:CommitLog、索引、消費隊列



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM