RocketMQ架構原理解析(四):消息生產端(Producer)


RocketMQ架構原理解析(一):整體架構
RocketMQ架構原理解析(二):消息存儲(CommitLog)
RocketMQ架構原理解析(三):消息索引(ConsumeQueue & IndexFile)
RocketMQ架構原理解析(四):消息生產端(Producer)

一、概述

如果你曾經使用過RocketMQ,那么一定對以下發送消息的代碼不陌生

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message(topic, new byte[] {'hello, world'});
producer.send(message);

寥寥幾行代碼,便是本文要論述的全部。阿里有句土話,叫“把復雜留給自己,把簡單交給別人”用在這里可能最合適不過了,這5行代碼中,最重要的是producer.start()producer.send(),也就是producer啟動及消息發送

二、Producer啟動

對應代碼producer.start()

其實僅僅一行代碼,在produer端的后台啟動了多個線程來協同工作,接下來我們逐一闡述

2.1、Netty

我們都知道,RocketMQ是一個集群部署、跨網絡的產品,除了producer、consumer需要網絡傳輸外,數據還需要在集群中流轉。所以一個高效、可靠的網絡組件是必不可少的。而RocketMQ選擇了netty

使用netty首先需要考慮的便是業務上的數據粘包問題,netty提供了一些較為常用的解決方案,如:固定長度(比如每次發送的消息長度均為1024byte)、固定分隔符(比如每次發送的消息長度均為1024byte)等。而RocketMQ使用的則是最為通用的head、body分離方式,即head存儲消息的長度,body存儲真正的消息數據,具體實現可參見類o.a.r.r.n.NettyRemotingClient

而消息收發這塊,RocketMQ將所有的消息都收斂到同一個協議類o.a.r.r.p.RemotingCommand中,即消息發送、接收都會將其封裝在該類中,這樣做的好處是不言而喻的,即統一規范,減輕網絡協議適配不同的消息類型帶來的負擔

其中較為重要的2個 ChannelHanlder 如下

  • org.apache.rocketmq.remoting.netty.NettyEncoder
    • 消息編碼,向 broker 或 nameServer 發送消息時使用,將RemotingCommand轉換為byte[]形式
  • org.apache.rocketmq.remoting.netty.NettyDecoder
    • 消息解碼,將byte[]轉換為RemotingCommand對象,接收 broker 返回的消息時,進行解碼操作

2.2、消息格式

消息格式是什么概念?在《消息存儲》章節不是已經闡述過消息格式了嗎?其實這是兩個概念,《消息存儲》章節是消息真正落盤時候的存儲格式,本小節的消息格式是指消息以什么樣的形態交給netty從而在網絡上進行傳輸

消息格式由MsgHeader及MsgBody組成,而消息的長度、標記、版本等重要參數都放在 header 中,body 中僅僅存儲數據,沒有額外字段;我們主要看一下 header 的數據格式

消息header格式

而站在 netty 視角來看,不論是 msgHeader 還是 msgBody,都屬於 netty 網絡消息的body部分,所以我們可以簡單畫一張 netty 視角的消息格式

netty視角的消息格式

2.2.1、Msg Header的自動適配

上文得知,RocketMQ將所有的消息類型、收發都收斂到類RemotingCommand中,但RocketMQ消息類型眾多,除了常見的消息發送、接收外,還有通過msgID查詢消息、msgKey查詢消息、獲取broker配置、清理不再使用的topic等等,用一個類適配如此多的類型,具體是如何實現的呢?當新增、修改一種類型又該怎么應對呢?

翻看源碼便發現,RemotingCommand的消息頭定義為一個接口org.apache.rocketmq.remoting.CommandCustomHeader,不同類型的請求都實現這個接口,並在自己的子類中定義成員變量;那RemotingCommand的消息頭又是如何自動解析呢?

public void makeCustomHeaderToNet() {
    if (this.customHeader != null) {
        Field[] fields = getClazzFields(customHeader.getClass());
        if (null == this.extFields) {
            this.extFields = new HashMap<String, String>();
        }
        for (Field field : fields) {
            if (!Modifier.isStatic(field.getModifiers())) {
                String name = field.getName();
                if (!name.startsWith("this")) {
                    Object value = null;
                    try {
                        field.setAccessible(true);
                        value = field.get(this.customHeader);
                    } catch (Exception e) {
                        log.error("Failed to access field [{}]", name, e);
                    }

                    if (value != null) {
                        this.extFields.put(name, value.toString());
                    }
                }
            }
        }
    }
}

答案就是反射,通過反射獲取子類的全部成員屬性,並放入變量extFields中,makeCustomHeaderToNet()通過犧牲少量性能的方式,換取了程序極大的靈活性與擴展性,當新增請求類型時,僅需要編寫新請求的encode、decode,不用修改其他類型請求的代碼

消息編解碼適配

2.3、Topic路由信息

2.3.1、Topic創建

發送消息的前置是需要創建一個topic,創建topic的admin命令如下

updateTopic -b <> -t <> -r <> -w <> -p <> -o <> -u <> -s <>

例如:
updateTopic -b 127.0.0.1:10911 -t testTopic -r 8 -w 8 -p 6 -o false -u false -s false

簡單介紹下每個參數的作用

  • -b broker 地址,表示 topic 所在 Broker,只支持單台Broker,地址為ip:port
  • -c cluster 地址,表示 topic 所在 cluster,會向 cluster 中所有的 broker 發送請求
  • -t topic 名稱
  • -r 可讀隊列數(默認為 8,后文還會展開)
  • -w 可寫隊列數(默認為 8,后文還會展開)
  • -p 指定新topic的讀寫權限 (W=2|R=4|WR=6)2表示當前topic僅可寫入數據,4表示僅可讀,6表示可讀可寫
  • -o set topic's order(true|false)
  • -u is unit topic (true|false)
  • -s has unit sub (true|false)

如果執行命令updateTopic -b 127.0.0.1:8899 -t testTopic -r 8 -w 8 意味着會在127.0.0.1:8899對應的broker下創建一個topic,這個topic的讀寫隊列都是 8

那如果是這樣的場景呢:集群A有3個master節點,當執行命令updateTopic -c clusterName -t testTopic -r 8 -w 8 后,站在集群A角度來看,當前topic總共創建了多少個寫隊列?其實 RocketMQ 接到這條命令后,會向3個 broker 分別發送創建 topic 的命令,這樣每個broker上都會有8個讀隊列,8個寫隊列,所以站在集群的視角,這個 topic 總共會有 24 個讀隊列,24 個寫隊列

創建流程

  • 一、創建Topic的客戶端(DefaultMQAdminExt
    • 第一步:該客戶端的啟動流程與Producer、Consumer類似,需要start(),它們共用MQClientInstance#start()方法,啟動后還有多個后台輪訓線程
    • 第二步:通過與NameServer交互,將指定ClusterName下所有的Broker信息拉下來
    • 第三步:依次向這些Broker發送創建Topic的請求
  • 二、Broker
    • 第一步:Broker收到創建Topic的請求后,做一些新Topic的初始化動作,而后將該Topic的元數據存儲在一個name為topics.json的本地文件中,因為在NameServer中並沒有對數據進行持久化,所以此文件即為Topic路由數據的唯一持久化文件,當然這樣的Broker一般是有多套的(其實此處是將所有json數據全部實例化后,替換本地文件,真實生產中,如果頻繁創建、銷毀topic,會帶來大量的文件IO,以及內存負擔,相信在未來近期的某個版本一定會進行修復
    • 第二步:向所有NameServer列表挨個發送Topic注冊請求
  • 三、NameServer
    • NameServer收到Broker注冊Topic的消息后,便將其路由信息存儲在內存中,當有Client請求Topic路由數據時,便將結果同步過去

我們以3個Broker、2個NameServer的集群舉例:
Topic注冊流程

  • Client -> Broker :3 次網絡IO,Client需要挨個向多個Broker發送注冊請求
  • Broker -> NameServer:6 次網絡IO,Broker需要向所有NameServer發送注冊請求

由此可見,NameServer確實是輕狀態的節點,路由的原始數據其實都存儲在Broker上,通過Broker向NameServer注冊,再有Client從NameServer處獲取元數據的方式來進行廣播、同步。此方案是rmq獨創,與kafka的重ZooKeeper形成對比,不過從實踐角度看,該架構還是比較穩定的

2.3.2、writeQueueNum VS readQueueNum

首選需要明確的是,讀、寫隊列,這兩個概念是 RocketMQ 獨有的,而 kafka 中只有一個partition的概念,不區分讀寫。一般情況下,這兩個值建議設置為相等;我們分別看一下 client 端對它們的處理 (均在類MQClientInstance.java

producer端

for (int i = 0; i < qd.getWriteQueueNums(); i++) {
    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
    info.getMessageQueueList().add(mq);
}

consumer端

for (int i = 0; i < qd.getReadQueueNums(); i++) {
    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
    mqList.add(mq);
}

如果2個隊列設置不相等,例如我們設置6個寫隊列,4個讀隊列的話:

writeQueueNum_and_writeQueueNum

這樣,4、5號隊列中的數據一定不會被消費。

  • writeQueueNum > readQueueNum
    • 大於 readQueueNum 部分的隊列永遠不會被消費
  • writeQueueNum < readQueueNum
    • 所有隊列中的數據都會被消費,但部分讀隊列數據一直是空的

這樣設計有什么好處呢?其實是更精細的控制了讀寫操作,例如當我們要遷移 broker 時,可以首先將寫入隊列設置為0,將客戶端引流至其他 broker 節點,等讀隊列數據也處理完畢后,再關閉 read 操作

2.3.3、路由數據格式

topic的路由數據如何由Admin發起創建,再被各個broker響應,繼而被nameServer統一組織創建的流程我們暫且不討論,為防止發散,我們直接從producer從nameServer獲取路由數據開始。從nameServer獲取到的路由數據格式如下

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

而存放路由數據的結構是queueDatasbrokerDatas

public class QueueData implements Comparable<QueueData> {
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
}

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}

在此,簡單闡述一下RocketMQ的cluster、brokerName、brokerId的概念
cluster_brokerName_brokerId概念

上圖描述了一個cluster下有3個broker,每個broker又有1個master,2個slave組成;這也就是為什么類BrokerData中有HashMap<Long, String> brokerAddrs變量的原因,因為可能同一個brokerName下由多個節點組成。注:master節點的編號始終為0

2.3.4、Topic路由信息何時發生變化

這些路由信息什么時候發生變化呢?我們舉例說明

舉例1:某集群有3台 master,分別向其中的2台發送了創建topic的命令,此時所有的clent端都知道這個topic的數據在這兩個broker上;這個時候通過admin向第3台broker發送創建topic命令,nameServer的路由信息便發生了變更,等client端30秒輪訓后,便可以更新到最新的topic路由信息

舉例2:某集群有3台 master,topic分別在3台broker上都創建了,此時某台broker宕機,nameServer將其摘除,等待30秒輪詢后,client拿到最新路由信息

思考:client 端路由信息的變化是依托於30秒的輪詢,如果路由信息已經發生變化,且輪詢未發生,client端拿着舊的topic路由信息訪問集群,一定會有短暫報錯,此處也是待優化的點

2.3.5、定時更新Topic路由信息

RocketMQ會每隔30秒更新topic的路由信息

定時更新topic路由信息

此處簡單留意一下TopicRouteDataTopicPublishInfo,其實TopicPublishInfo是由TopicRouteData變種而來,多了一個messageQueueList的屬性,在producer端,該屬性為寫入隊列,即某個topic所有的可寫入的隊列集合

此處拋出一個問題,如果producer只想某個topic發送了一條消息,后續再沒有發送過,這種設計會帶來哪些問題?如果這種場景頻繁發生呢?

2.4、與Broker心跳

主要分為兩部分:

  • 1、清空無效broker
  • 2、向有效的broker發送心跳

2.4.1、清空無效的broker

由上節得知,RocketMQ會獲取所有已經注冊的topic所在的broker信息,並將這些信息存儲在變量brokerAddrTable中,brokerAddrTable的存儲結構如下

ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable ;
  • key: brokerName,例如一個master帶2個slave都屬於同一個brokerName
  • val: HashMap<Long, String>,key為brokerId(其中master的brokerId固定為0),val為ip地址

如何判斷某個broker有效無效呢?判斷依據便是MQClientInstance#topicRouteTable,這個變量是上節中從nameServer中同步過來的,如果brokerAddrTable中有broker A、B、C,而topicRouteTable只有A、B的話,那么就需要從brokerAddrTable中刪除C。

需要注意的是,在整個check及替換過程中都添加了獨占鎖lockNamesrv,而上節中維護更新topic路由信息也是指定的該鎖

2.4.2、發送心跳數據

發送心跳數據

此處目的僅為與broker保持網絡心跳,如果連接失敗或發生異常,僅會打印日志,並不會有額外操作

2.5、多Producer

這里簡單提一下,其實在單個進程中,是可以啟動多個Producer的,且相互隔離;實現起來感覺也比較容易,感覺直接new DefaultMQProducer()就行。不過這里有個性能上的問題,就是如果兩個Producer操作了同樣的Topic,此時去NameServer拉取路由數據的時候,將會線性的放大,因此RMQ引入了MQClientInstance概念,即在單個進程中,MQClientInstance是單例的,諸如獲取Topic路由數據等,均是其統一發起,讀者在源碼中看到這個類時不要覺得陌生哈

多Producer

三、消息發送

消息發送流程

消息發送比較重要的是2點內容

  • 發送數據的負載均衡問題;RocketMQ默認采用的是輪訓的方式
  • 消息發送的方式;分同步、異步、單向

3.1、消息保序 vs 負載均衡

默認選擇隊列的策略為輪詢方式,來保證消息可以均勻的分配到每個隊列;

既然說到隊列,就不得不提到消息的有序性問題

3.1.1、普通消息

消息是無序的,可發送至任意隊列,producer 也不關心消息會存儲在哪個隊列。在這種模式下,如果發送失敗,producer 會按照輪詢的方式,重新選取下一個隊列進行重試

producer.send(message);

3.1.2、普通有序消息

用戶可根據消息內容來選擇一個隊列發送 ,在這種情況下,消息也一般是保序的,例如我們可以通過業務字段(例如用戶id)的 msgKey 取模來選擇隊列,這樣同樣 msgKey 的消息必定會落在同一個隊列中。

與發送普通消息不同,如果發送失敗,將不會進行重試,也比較好理解,普通消息發送失敗后,也不會針對當前隊列進行重試,而是選擇下一個隊列

producer.send(zeroMsg, (mqs, msg, arg) -> {
    int index = msg.getKeys().hashCode() % mqs.size();
    return mqs.get(index);
}, 1000);

但也存在異常情況,例如當前 topic 的路由信息發生了變化,取模后消息可能命中了另外一個隊列,自然也無法做到嚴格保序

3.1.3、嚴格有序消息

即 producer 自己嚴格發送給指定的隊列,如果發送異常則快速失敗,可見這種方式可以嚴格保證發送的消息在同一個隊列中,即便 topic 路由信息發生變化,也可以嚴格保序

producer.send(message, messageQueue);

3.2、消息發送的3種方式

RocketMQ的rpc組件采用的是netty,而netty的網絡請求設計是完全異步的,所以一個請求避免一定可以拆成以下3個步驟

  • a、客戶端發送請求到服務器(由於完全異步,所以請求數據可能只放在了socket緩沖區,並沒有出網卡
  • b、服務器端處理請求(此過程不涉及網絡開銷,不過通常也是比較耗時的
  • c、服務器向客戶端返回應答(請求的response

3.2.1、同步發送消息

SendResult result = producer.send(zeroMsg);

此過程比較好理解,即完成a、b、c所有步驟后才會返回,耗時也是 a + b + c 的總和

3.2.2、異步發送消息

通常在業務中發送消息的代碼如下:

SendCallback sendCallback = new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // doSomeThing;
    }
    @Override
    public void onException(Throwable e) {
        // doSomeThing;
    }
};
producer.send(zeroMsg, sendCallback);

而RocketMQ處理異步消息的邏輯是,直接啟動一個線程,而最終的結果異步回調SendCallback

ExecutorService executor = this.getAsyncSenderExecutor();
try {
    executor.submit(new Runnable() {
        @Override
        public void run() {
            try {
                sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
            } catch (Exception e) {
                sendCallback.onException(e);
            }
        }

    });
} catch (RejectedExecutionException e) {
    throw new MQClientException("executor rejected ", e);
}

3.2.2、單向發送消息

producer.sendOneway(zeroMsg);

此模式與sync模式類似,都要經過producer端在數據發送前的數據組裝工作,不過在將數據交給netty,netty調用操作系統函數將數據放入socket緩沖區后,所有的過程便已結束。什么場景會用到此模式呢?比如對可靠性要求並不高,但要求耗時非常短的場景,比如日志收集等

三個請求哪個更快呢?如果單論一個請求的話,肯定是async異步的方式最快,因為它直接把工作交給另外一個線程去完成,主線程直接返回了;但不論是async還是sync,它們都是需要將 a、b、c 3個步驟都走完的,所以總開銷並不會減少。但oneWay因為只需將數據放入socket緩沖區后,client 端就直接返回了,少了監聽並解析 server 端 response 的過程,所以可以得到最好的性能

四、總結

本章闡述了producer端相對重要的一些功能點,感覺比較核心的還是隊列相關的概念;但RocketMQ發展迭代了這么多年,也涵蓋了很多及細小的特性,本文不能窮盡,比如“消息的壓縮”、“規避發送延遲較長的broker”、“超時異常”等等,這些功能點獨立且零碎,讀源碼時可以帶着問題跟進,這樣針對性強,效率也會高很多


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM