RocketMQ源碼詳解 | Producer篇 · 其一:Start,然后 Send 一條消息


概述


DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.start();

try {
  /*
   * Create a message instance, specifying topic, tag and message body.
   */
  Message msg = new Message("TopicTest" /* Topic */,
                            "TagA" 			/* Tag */,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                           );
  
  /*
   * Call send message to deliver message to one of brokers.
   */
  SendResult sendResult = producer.send(msg);
  System.out.printf("%s%n", sendResult);
} catch (Exception e) {
  e.printStackTrace();
}

/*
 * Shut down once the producer instance is not longer in use.
 */
producer.shutdown();



Start

DefaultMQProducer#start

首先進入 start 方法,可以看出主要的功能實現在於 defaultMQProducerImpl.start(),先忽略細枝末節,接着進去看看

public void start() throws MQClientException {
  this.setProducerGroup(withNamespace(this.producerGroup));
  this.defaultMQProducerImpl.start();
  if (null != traceDispatcher) {
    try {
      traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
    } catch (MQClientException e) {
      log.warn("trace dispatcher start failed ", e);
    }
  }
}

DefaultMQProducerImpl#start()

然后,我們可以看到會根據當前生產者的狀態來進行不同的行為

記得在設計模式里,這叫做"狀態模式"

具體的狀態有:

  • CREATE_JUST
  • RUNNING
  • START_FAILED
  • SHUTDOWN_ALREADY

在進入 start 后狀態會變成 START_FAILED ,完成后變成 RUNNING 狀態

public void start() throws MQClientException {
  this.start(true);
}

public void start(final boolean startFactory) throws MQClientException {
  switch (this.serviceState) {
    case CREATE_JUST:
      this.serviceState = ServiceState.START_FAILED;
      // -- 跳過 --
      this.serviceState = ServiceState.RUNNING;
      break;
    case RUNNING:
    case START_FAILED:
    case SHUTDOWN_ALREADY:
      throw new MQClientException("The producer service state not OK, maybe started once, "
                                  + this.serviceState
                                  + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                  null);
    default:
      break;
  }
}

進入 CREATE_JUST 后開始對元信息進行檢查與注冊

this.checkConfig();

// 如果自定義了生產者組,則修改PID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
  this.defaultMQProducer.changeInstanceNameToPID();
}

// 從MQ工廠獲取實例
// MQ工廠保證ClientID唯一
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

// 注冊生產者組
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
  this.serviceState = ServiceState.CREATE_JUST;
  throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                              + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                              null);
}

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {
  mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), 
         this.defaultMQProducer.isSendMessageWithVIPChannel());

然后具體看 mQClientFactory.start() 方法


MQClientInstance#start

MQClientInstance 是由一個 JAVA 程序所共用的,其可以從 ClientId 的生成方法看出

public String buildMQClientId() {
  StringBuilder sb = new StringBuilder();
  sb.append(this.getClientIP());

  sb.append("@");
  sb.append(this.getInstanceName());
  if (!UtilAll.isBlank(this.unitName)) {
    sb.append("@");
    sb.append(this.unitName);
  }

  return sb.toString();
}

由以上代碼可以得知,一台機器上的一個 JVM 進程只擁有一個實例,所以以下的初始化方法也是全局的


首先對當前對象加鎖來避免多線程帶來的問題,然后又進行了一次狀態判斷來保證狀態正確。然后就啟動了一堆服務。

synchronized (this) {
  switch (this.serviceState) {
    case CREATE_JUST:
      this.serviceState = ServiceState.START_FAILED;
      // NameSrv 地址為空時,嘗試通過設定的地址使用HTTP獲取NameSrv地址
      if (null == this.clientConfig.getNamesrvAddr()) {
        this.mQClientAPIImpl.fetchNameServerAddr();
      }
      // 開啟 Netty 的請求響應的 Channel
      this.mQClientAPIImpl.start();
      // 開啟調度任務
      this.startScheduledTask();
      // 開啟拉取服務
      this.pullMessageService.start();
      // 開啟再均衡服務
      this.rebalanceService.start();
      // 開啟push服務
      this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
      log.info("the client factory [{}] start OK", this.clientId);
      this.serviceState = ServiceState.RUNNING;
      break;
    case START_FAILED:
      throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
    default:
      break;
  }
}

其中的 pullMessageServicerebalanceService 等服務都是繼承於 ServiceThread 抽象類,這個類被用於在多線程的情況下保證服務啟動的正確性。

public void start() {
  log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
  // 只會被啟動一次
  if (!started.compareAndSet(false, true)) {
    return;
  }
  stopped = false;
  // 啟動子類的實現線程,從子類獲取服務名
  this.thread = new Thread(this, getServiceName());
  this.thread.setDaemon(isDaemon);
  this.thread.start();
}

其中 RebalanceServicePullMessageService 我們在其他章節再具體分析。


此外,其中的 startScheduledTask() 又開啟了一些定時運行的任務

// 從遠程服務器不斷更新 NameServer 地址
if (null == this.clientConfig.getNamesrvAddr()) {
  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
      try {
        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
      } catch (Exception e) {
        log.error("ScheduledTask fetchNameServerAddr exception", e);
      }
    }
  }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

// 定時從NameServer更新Topic的路由信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

  @Override
  public void run() {
    try {
      MQClientInstance.this.updateTopicRouteInfoFromNameServer();
    } catch (Exception e) {
      log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
    }
  }
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

// 定期清除離線的Broker地址,同時發送心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

  @Override
  public void run() {
    try {
      MQClientInstance.this.cleanOfflineBroker();
      MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
    } catch (Exception e) {
      log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
    }
  }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

// 持久化所有的擁有的消費者偏移量
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

  @Override
  public void run() {
    try {
      MQClientInstance.this.persistAllConsumerOffset();
    } catch (Exception e) {
      log.error("ScheduledTask persistAllConsumerOffset exception", e);
    }
  }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

// 動態對所有消費者的線程池容量進行調整
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

  @Override
  public void run() {
    try {
      MQClientInstance.this.adjustThreadPool();
    } catch (Exception e) {
      log.error("ScheduledTask adjustThreadPool exception", e);
    }
  }
}, 1, 1, TimeUnit.MINUTES);

從這里可以看出,消費者不僅是在內存保存了偏移量,還會定期持久化以保證不丟失。

運行這些任務的,是一個 SingleThreadScheduledExecutor ,這是一個由一個線程去執行需要被定時執行的任務的線程池。


在完成以上任務后,回到我們的 DefaultMQProducerImpl.start() 方法看剩下的兩段

// 為所有Broker發送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

// 開啟所有的調度任務
this.startScheduledTask();

首先來看第一個方法

public void sendHeartbeatToAllBrokerWithLock() {
  if (this.lockHeartbeat.tryLock()) {
    try {
      // 發送心跳,但第一次的時候是空的,所以不用考慮
      this.sendHeartbeatToAllBroker();
      // 上傳過濾器Class,消費者相關
      this.uploadFilterClassSource();
    } catch (final Exception e) {
      log.error("sendHeartbeatToAllBroker exception", e);
    } finally {
      this.lockHeartbeat.unlock();
    }
  } else {
    log.warn("lock heartBeat, but failed. [{}]", this.clientId);
  }
}

在發送心跳的時候,由於這時候還沒有從 NameServer 獲取 Broker 地址,所以不會發送,而上傳過濾器 Class 我們留到消費者的章節再講。


第二個方法比較簡單,開啟一個調度任務來處理所有的 Request 狀態,對異步的請求根據狀態處理回調函數。

private void startScheduledTask() {
  // 原子增加生產者數量
  if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          // 處理異步回調請求,掃描所有Request狀態
          RequestFutureTable.scanExpiredRequest();
        } catch (Throwable e) {
          log.error("scan RequestFutureTable exception", e);
        }
      }
    }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
  }
}

需要注意的是,這個異步請求指的並不是在 send 中的異步回調機制,而是在 rockmq-4.7.0 后加入的 Request-Reply 特性,用來支持 RPC 調用。

實現的方法是:

  1. Producer 投遞消息到 Broker,然后阻塞直到收到 返回結果
  2. Broker 收到后像正常消息一樣 ack,Consumer 通過 pull\push 從 Broker 中獲取
  3. Consumer 獲取后進行處理,然后響應 返回結果
  4. Broker 收到 返回結果 后將其發回對應的 Producer

核心的方法與類有:

DefaultMQProducerImpl#request()
RequestResponseFuture#waitResponseMessage()#putResponseMessage()
MessageUtil#createReplyMessage()
ReplyMessageProcessor#processReplyMessageRequest()
ClientRemotingProcessor#receiveReplyMessage()

自此,Start 方法就完成了。


Start 總體流程

  1. 設置生產者組

  2. 檢查當前狀態,只允許為 CreateJust

  3. 從 MQClientManage 獲取 MQClient 工廠

    3.1 已經被創建則直接返回

  4. 注冊生產者組

  5. 啟動 MQClient 工廠

    5.1 NameSrv 地址為空時,嘗試通過設定的地址使用HTTP獲取NameSrv地址

    5.2 開啟 Netty 的請求響應的 Channel

    5.3 開啟調度任務

     5.3.1 從遠程服務器不斷更新 NameServer 地址

     5.3.2 定時從NameServer更新Topic的路由信息

     5.3.3 定期清除離線的Broker地址,同時發送心跳

     5.3.4 持久化所有的擁有的消費者偏移量

     5.3.5 動態對所有消費者的線程池容量進行調整

    5.4 開啟拉取服務

    5.5 開啟再均衡服務

    5.6 開啟push服務

  6. 啟動 trace dispatcher 服務




Send

進入 send 后會進行完整檢查,且默認工作模式為 Sync,send 的主要工作的方法如下


DefaultMQProducerImpl#sendDefaultImpl

這個方法很長,我們一段段來看

// 確保是 Start 狀態
this.makeSureStateOK();

Validators.checkMessage(msg, this.defaultMQProducer);

final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;

// 得到Topic信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

// 返回為Null的結果只有Topic不存在且自動創建Topic沒有打開
if (topicPublishInfo != null && topicPublishInfo.ok()) {  /* pass */  }

// 檢查下是不是NameSev地址填錯了
validateNameServerSetting();

throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);

首先是確保狀態正確,然后記錄啟動時間作為度量和超時檢查,然后調用 tryToFindTopicPublishInfo 獲得具體的 Topic 信息以發送消息,如果找不到則拋異常。

然后進入 tryToFindTopicPublishInfo 看看具體實現

// 尋找消息應該被發到哪
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);

  // 本地沒有、或未就緒,從NameServ請求
  if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    // 沒有信息則從NameServ拉取
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
  }

  if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    return topicPublishInfo;
  } else {
    // 拉取不到?說明我們要發送的目標Topic不存在
    // 那就打開isDefault開關,向默認Topic發送
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
    return topicPublishInfo;
  }
}

這個方法可以看到,在本地會有一個本地 Topic 表,沒有會嘗試去 NameServer 拉取。

而拉取分為兩階段,第一次拉取會去找對應的 Topic ,失敗則第二次會去找 Default Topic。為什么會這樣做呢?我們都知道自動創建 Topic 只會在 Broker 打開自動創建 Topic 的開關才有效,而具體的實現方法需要我們再往下看


MQClientInstance#updateTopicRouteInfoFromNameServer

又進入了 MQClientInstance ,我們剛剛已經了解到它會由 MQClientManager 創建一個全局的實例,而它內部有幾個重要的 Map。

private final ConcurrentMap<String/* group */, MQProducerInner> producerTable;
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable;
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable;
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable;
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable;
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable;

從變量名和和注釋我們不難理解它們是做什么的。我們可以發現, Client 的元信息是由所有的消費者和生產者共享。

回歸正題,接着看更新路由信息的方法

TopicRouteData topicRouteData;
// 使用默認 Topic ,因為目標Topic不存在,所以需要新建
if (isDefault && defaultMQProducer != null) {
  // 獲取默認Topic路由信息
  topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                                                               1000 * 3);
  // 獲得了信息后,接下來都是新的Topic都是繼承於默認Topic的信息

  if (topicRouteData != null) {
    // 修正讀寫Queue數量
    for (QueueData data : topicRouteData.getQueueDatas()) {
      int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
      data.setReadQueueNums(queueNums);
      data.setWriteQueueNums(queueNums);
    }
  }
} else {
  topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}

這里分為直接從 NameServer 獲取和獲取 default topic 后對其元信息進行繼承。

關於 ReadQueue 和 WriteQueue

可讀 Queue 代表消費者可以讀取的 Queue,可寫 Queue 代表生產者可以寫入的 Queue。

將它們進行分離的主要是為了方便的動態調整 Queue 的大小

其中 getTopicRouteInfoFromNameServer 主要是 Netty 使用 RPC 獲取 Topic,具體內容在其他章看看


在獲取了路由信息后,就開始對其進行檢查更改的項目

if (topicRouteData != null) {
  // 檢查路由信息是否發生改變
  TopicRouteData old = this.topicRouteTable.get(topic);
  boolean changed = topicRouteDataIsChange(old, topicRouteData);
  if (!changed) {
    changed = this.isNeedUpdateTopicRouteInfo(topic);
  } else {
    log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
  }

  // 發生改變則進行更改
  if (changed) {
    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

    // 更新路由地址
    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
      this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    }

    // Update Pub info
    {
      TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
      publishInfo.setHaveTopicRouterInfo(true);
      Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
      while (it.hasNext()) {
        Entry<String, MQProducerInner> entry = it.next();
        MQProducerInner impl = entry.getValue();
        if (impl != null) {
          // 在這,被新建的Topic放入了本地生產者表
          impl.updateTopicPublishInfo(topic, publishInfo);
        }
      }
    }

    // Update sub info
    {
      Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
      Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
      while (it.hasNext()) {
        Entry<String, MQConsumerInner> entry = it.next();
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
          impl.updateTopicSubscribeInfo(topic, subscribeInfo);
        }
      }
    }
    log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    // 重新放入
    this.topicRouteTable.put(topic, cloneTopicRouteData);
    return true;
  }
} else {
  log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}

如果是 default topic 的話,就會在這一步放入本地。

這是因為,在 Broker 中,如果開啟了自動創建 Topic 的選項,便會創建一個和 default 同名(default 的默認名稱為 TBW102)的 Topic,並以這個 Topic 來創建新的 Topic ,但若沒有開啟,則會因為找不到 TopicName 而返回錯誤。


DefaultMQProducerImpl#sendDefaultImpl

我們再回到發送方法來,在獲取到要發送的 Topic 的元信息后,就可以開始發送了

boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;

// 最多發送timesTotal次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;

String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
  String lastBrokerName = null == mq ? null : mq.getBrokerName();
  // 選擇一個Queue
  MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
  if (mqSelected != null) {
    /*   pass   */
  } else {
    break;
  }
}

在這里,Producer 會根據發送類型來選擇發送次數,同步會選擇默認的重試次數加一,而異步和oneway則只會嘗試一次

在發送流程中,首先需要選擇 Topic 中的 Queue


MQFaultStrategy#selectOneMessageQueue

從類名可以看出,這個類主要實現故障退避功能,同時使用輪詢的方式來選擇 Queue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
  // 故障退避: 當要發送的Broker在上一次發送中延遲了較久的時間或發送失敗,會進行一段時間的退避
  if (this.sendLatencyFaultEnable) {
    try {
      // 首先,增加線程本地的輪詢計數
      int index = tpInfo.getSendWhichQueue().incrementAndGet();
      for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
        if (pos < 0)
          pos = 0;
        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
        // 如果可用(不需要進行退避),則直接使用
        if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
          return mq;
      }

      // 在所有Broker都需要退避的情況下,即沒有最優解,選擇次優
      final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
      int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
      if (writeQueueNums > 0) {
        final MessageQueue mq = tpInfo.selectOneMessageQueue();
        if (notBestBroker != null) {
          mq.setBrokerName(notBestBroker);
          mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
        }
        return mq;
      } else {
        // 如果可寫 Queue 已經為零,說明已經不在了
        latencyFaultTolerance.remove(notBestBroker);
      }
    } catch (Exception e) {
      log.error("Error occurred when selecting message queue", e);
    }

    return tpInfo.selectOneMessageQueue();
  }

  // 如果沒有開啟開關,則選擇一個不是上一次發送的Broker來發送
  return tpInfo.selectOneMessageQueue(lastBrokerName);
}

大多數內容直接看代碼就能理解,所以我們主要看下 pickOneAtLeast 方法


LatencyFaultToleranceImpl#pickOneAtLeast

這個方法用於在所有 Broker 都需要"故障退避"的時候,選擇一個可能最好的

public String pickOneAtLeast() {
  final Enumeration<FaultItem> elements = this.faultItemTable.elements();
  List<FaultItem> tmpList = new LinkedList<FaultItem>();
  while (elements.hasMoreElements()) {
    final FaultItem faultItem = elements.nextElement();
    tmpList.add(faultItem);
  }

  if (!tmpList.isEmpty()) {
    Collections.shuffle(tmpList);

    Collections.sort(tmpList);

    final int half = tmpList.size() / 2;
    if (half <= 0) {
      return tmpList.get(0).getName();
    } else {
      final int i = this.whichItemWorst.incrementAndGet() % half;
      return tmpList.get(i).getName();
    }
  }

  return null;
}

這個方法主要的策略是:

對當前的處於"故障退避"狀態的 Broker 使用洗牌算法(這有啥意義...),然后進行排序,然后選擇所有元素的前半段使用輪詢策略

排序方案:是否可用 > 上次響應時間(短>長)(發生網絡分區的時候會相同)> 未來可用時間點(近>遠)


DefaultMQProducerImpl#sendDefaultImpl

選擇完 Queue 后,就要進行實際的發送了

mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
  beginTimestampPrev = System.currentTimeMillis();
  if (times > 0) {
    // 在重新發送期間使用命名空間重置主題
    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
  }

  // 消耗時間的度量
  long costTime = beginTimestampPrev - beginTimestampFirst;
  if (timeout < costTime) {
    callTimeout = true;
    break;
  }

  // 發送MSG
  sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);


  endTimestamp = System.currentTimeMillis();
  // 更新故障退避功能
  this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

  // 根據發送方式選擇返回結果
  switch (communicationMode) {
    case ASYNC:
      return null;
    case ONEWAY:
      return null;
    case SYNC:
      if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
          continue;
        }
      }
      return sendResult;
    default:
      break;
  }
} catch (XXX e) {
	/* 這里刪除了一堆Expection catch, 它們都主要做了更新"故障退避", 然后拋出 */
}

可以看出,實際的發送方法是 sendKernelImpl

private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  if (null == brokerAddr) {
    // 不在內存中,嘗試獲取或創建
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  }

  SendMessageContext context = null;
  if (brokerAddr != null) {
    // 是否使用vip,broker有兩個端口共同服務
    brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

    byte[] prevBody = msg.getBody();
    try {
      // 對於MessageBatch,生成過程中已經設置了ID
      if (!(msg instanceof MessageBatch)) {
        MessageClientIDSetter.setUniqID(msg);
      }

      // 將實例名設置為命名空間
      boolean topicWithNamespace = false;
      if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
        msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
        topicWithNamespace = true;
      }

      // 是否為壓縮消息
      int sysFlag = 0;
      boolean msgBodyCompressed = false;
      if (this.tryToCompressMessage(msg)) {
        sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
        msgBodyCompressed = true;
      }

      final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
      if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
      }

      // 發送消息的校驗鈎子
      if (hasCheckForbiddenHook()) {
        CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
        checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
        checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
        checkForbiddenContext.setCommunicationMode(communicationMode);
        checkForbiddenContext.setBrokerAddr(brokerAddr);
        checkForbiddenContext.setMessage(msg);
        checkForbiddenContext.setMq(mq);
        checkForbiddenContext.setUnitMode(this.isUnitMode());
        this.executeCheckForbiddenHook(checkForbiddenContext);
      }

      // 發送消息前的鈎子
      if (this.hasSendMessageHook()) {
        context = new SendMessageContext();
        context.setProducer(this);
        context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        context.setCommunicationMode(communicationMode);
        context.setBornHost(this.defaultMQProducer.getClientIP());
        context.setBrokerAddr(brokerAddr);
        context.setMessage(msg);
        context.setMq(mq);
        context.setNamespace(this.defaultMQProducer.getNamespace());
        String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (isTrans != null && isTrans.equals("true")) {
          context.setMsgType(MessageType.Trans_Msg_Half);
        }

        if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
          context.setMsgType(MessageType.Delay_Msg);
        }
        this.executeSendMessageHookBefore(context);
      }

      // 組裝消息頭
      SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
      requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
      requestHeader.setTopic(msg.getTopic());
      requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
      requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
      requestHeader.setQueueId(mq.getQueueId());
      requestHeader.setSysFlag(sysFlag);
      requestHeader.setBornTimestamp(System.currentTimeMillis());
      requestHeader.setFlag(msg.getFlag());
      requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
      requestHeader.setReconsumeTimes(0);
      requestHeader.setUnitMode(this.isUnitMode());
      requestHeader.setBatch(msg instanceof MessageBatch);
      // 發往重發Topic的消息
      if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
        String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
        if (reconsumeTimes != null) {
          requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
          MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
        }

        String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
        if (maxReconsumeTimes != null) {
          requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
          MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
        }
      }

      SendResult sendResult = null;
      switch (communicationMode) {
        case ASYNC:
          Message tmpMessage = msg;
          boolean messageCloned = false;
          if (msgBodyCompressed) {
            //If msg body was compressed, msgbody should be reset using prevBody.
            //Clone new message using commpressed message body and recover origin massage.
            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
            msg.setBody(prevBody);
          }

          // 從命名空間中解包
          if (topicWithNamespace) {
            if (!messageCloned) {
              tmpMessage = MessageAccessor.cloneMessage(msg);
              messageCloned = true;
            }
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
          }

          // 超時檢查
          long costTimeAsync = System.currentTimeMillis() - beginStartTime;
          if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
          }
          // 發送異步消息
          sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
          break;
        case ONEWAY:
        case SYNC:
          long costTimeSync = System.currentTimeMillis() - beginStartTime;
          if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
          }
          sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
          break;
        default:
          assert false;
          break;
      }

      // 發送消息后的鈎子
      if (this.hasSendMessageHook()) {
        context.setSendResult(sendResult);
        this.executeSendMessageHookAfter(context);
      }

      return sendResult;
    } catch (RemotingException e) {
      if (this.hasSendMessageHook()) {
        context.setException(e);
        this.executeSendMessageHookAfter(context);
      }
      throw e;
    } catch (MQBrokerException e) {
      if (this.hasSendMessageHook()) {
        context.setException(e);
        this.executeSendMessageHookAfter(context);
      }
      throw e;
    } catch (InterruptedException e) {
      if (this.hasSendMessageHook()) {
        context.setException(e);
        this.executeSendMessageHookAfter(context);
      }
      throw e;
    } finally {
      msg.setBody(prevBody);
      msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
    }
  }

  throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

這個方法干了不少臟活,不過發送的具體實現還是通過 MQClientAPIImpl.sendMessage 來實現。



send 發送流程

  1. 檢查消息

  2. 從消息的目的 TopicName 中獲取元信息;若獲取不到 Topic,則拋異常

    2.1 從本地獲取,沒有則從 NameServer 獲取

     2.1.1 從 NameServer 獲取 Topic 元信息,沒有則直接返回

     2.1.2 更新獲取的 Topic 的路由信息

    2.2 將獲取的 Topic 直接返回,若 NameServer 也沒,則進行創建

     2.2.1 獲取默認 Topic;獲取失敗直接返回

     2.2.2 繼承該 Topic 的信息來進行更改以作為新 Topic

  3. 從 Topic 中選擇 Queue

    3.1 排除掉在故障退避的 Broker 后,將下一個 Broker 所在的 Queue 返回

    3.2 所有 Broker 都需要退避下,選擇次優 Broker

  4. 發送消息;失敗則退回第三步

    4.1 Vip 檢查

    4.2 消息類型檢查

    4.3 調用鈎子

    4.4 組裝消息頭

    4.5 發送消息

  5. 更新故障退避信息

  6. 根據發送方式返回結果




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM