RocketMQ(八):消息發送


匠心零度 轉載請注明原創出處,謝謝!

RocketMQ網絡部署圖

  • NameServer:在系統中是做命名服務,更新和發現 broker服務。
  • Broker-Master:broker 消息主機服務器。
  • Broker-Slave: broker 消息從機服務器。
  • Producer: 消息生產者。
  • Consumer: 消息消費者。

說明: rocketmq系列都將會以rocketmq-4.1.0-incubating進行介紹。

在閱讀源碼時做了一定的注釋,公眾號【匠心零度】回復:rocketmq,可獲得基於rocketmq4.1.0加詳細中文代碼注釋 。歡迎大家 star、fork !

廝大說過消息中間件的本質消息中間件大道至簡:一發一存一消費 ,今天主要來討論下,就是RocketMQ網絡部署圖中用顏色標記的部分。

往期rocketmq系列文章

消息發送概述

上面的圖大概就是producer發送message到broker的核心邏輯了。

問題思考:

把broker相關信息緩存到客戶端減少了與namesrv的交互,但是也降低了broker變化的實時性了,如何忽然有一台broker不可用了會怎么樣呢?(后續看看rocketmq的處理),為什么producer發送會那么快呢?本質是由於netty的writeAndFlush?producer如何做到異步發送?同步發送?oneway發送的呢?如果發送失敗會怎么處理呢?

消息發送一般流程分析

由於發送還涉及到定時發送,順序發送,批量發送等情況,本篇考慮到篇幅問題就是一般的發送邏輯講解,后面繼續分享其他情況。

閱讀本篇前應該重點閱讀下:RocketMQ(二):RPC通訊

如何在本地調試之前文章也分享過了,在此就不提了,發送的邏輯相對於存儲以及消費來說是最簡單的(直接根據一條線不斷的跟下去基本就差不多了),而存儲最復雜,其次消費(這些過程可能一條線不好找,后續分享)。

同步發送寫法

備注: 可以參考RocketMQ快速入門即可。

producer.start

    /**
     * Start this producer instance.
     * </p>
     *
     * <strong>
     * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
     * this method before sending or querying messages.
     * </strong>
     * </p>
     *
     * @throws MQClientException if there is any unexpected error.
     */
    @Override
    public void start() throws MQClientException {
        this.defaultMQProducerImpl.start();
    }

主要做了下列事情(核心事情):

  • 一些配置檢查。
  • 構建與namesrv通信的netty客戶端。
  • 默認每30s與namesrv交換獲取broker相關信息。
  • 默認每30s去掉失效的broker信息以及發送心跳到所有broker上面。

構建Message對象

producer是以Message對象進行發送的,看看Message構造:

    public Message() {
    }

    public Message(String topic, byte[] body) {
        this(topic, "", "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;

        if (tags != null && tags.length() > 0)
            this.setTags(tags);

        if (keys != null && keys.length() > 0)
            this.setKeys(keys);

        this.setWaitStoreMsgOK(waitStoreMsgOK);
    }

    public Message(String topic, String tags, byte[] body) {
        this(topic, tags, "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, byte[] body) {
        this(topic, tags, keys, 0, body, true);
    }

備注: 主要就是topic、tags、以及body真實內容等。

send發送

SendResult sendResult = producer.send(msg);

進行發送處理。下面我們重點看看send如何處理。

發送send核心分析

發送的幾種方式:同步 異步 oneway(應該選擇哪種,需要自己根據情況進行判斷)

以同步發送為例子,默認超時時間為3s,

SendResult sendResult = producer.send(msg);

這個就是發送的觸發方法,我們一直跟進去就行了,第一初步感受:通過跟蹤進去第一感覺就是涉及到了JUC相關使用,大量運用享元模式(本質一個map進行緩存)以及netty使用。

核心邏輯:

代碼就不大量復制了,需要的github里面獲取基於rocketmq4.1.0加詳細中文代碼注釋 。歡迎大家 star、fork !

  • 判斷服務是否可用? 不可用直接結束流程。

  • 消息的驗證:

  • 獲取topic路由信息

    緩存中有就獲取,沒有就namesrv交互一次(也可能2次)由於topic信息在broker服務端不一定存在,如果不存在就用默認的(TBW102)。

封裝請求頭信息:

// Namesrv 根據Topic獲取Broker Name、隊列數(包含讀隊列與寫隊列)
 public static final int GET_ROUTEINTO_BY_TOPIC = 105;

namesrv服務端接受到這個請求的處理情況。

最后得到的路由信息類似下面的:

  • 發送模式是sync 會有3次其他1次

    //發送模式是sync 會有3次其他1次
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    
  • 選擇一個queue

    如何選擇發送那個broker的那個queueid上面?(客戶端自己負載),由於broker相關信息緩存在客戶端里面,問題來了(由於30s會同步一次信息,那么在30s之內broker出現問題會怎么樣呢? )rocketmq是這樣處理的:sendLatencyFaultEnable開關是否打開

    1.打開--> 有多長時間內不可用情況

    2.不打開(默認)-->直接隨機一個(如果帶了lastBrokerName不為空 盡量換不是這個broker的,如果都沒有又是隨機一個)

  • 調用sendKernelImpl發送消息 發送消息核心

    根據broker的name獲取到ip地址,如果通道沒有建立並且保存。

    設置設置UNIQ_id,里面保護客戶端ip地址信息。

    發送的時候 會有鈎子函數提供執行(禁止消息鈎子 ,發送消息鈎子(executeSendMessageHookBefore、executeSendMessageHookAfter)。

    構建SendMessageRequestHeader,包括生成消息時間戳,所以各各機器時間最好一致,(這樣后期也可以查下broker接受消息花了多少時間)。

  • 根據發送消息模式,選擇發送方式

    下面這次主要看同步發送情況。

    如果1情況執行nettywriteAndFlush發送成功者跳出來,到達3情況進行等等最多等待3s。這里什么時候喚醒呢? 其實是在broker情況響應客戶端的時候進行喚醒的:

    備注: 這里使用CountDownLatch異步轉同步的。

    如果是2情況表示發送失敗,直接喚醒3情況不進行阻塞了(最后拋異常表示發送失敗)

  • 更新broker可用時間

  • retryAnotherBrokerWhenNotStoreOK情況判斷

    如果設置為retryAnotherBrokerWhenNotStoreOK為true之后,在發送失敗的時候,會選擇換一個broker。

  • 如下異常continue,進行發送消息重試

客戶端發送流程大概到這里就分析完成了。


如果讀完覺得有收獲的話,歡迎點贊、關注、加公眾號【匠心零度】,查閱更多精彩歷史!!!

加入知識星球,一起探討!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM