RocketMQ(九):消息發送(續)


匠心零度 轉載請注明原創出處,謝謝!

RocketMQ網絡部署圖

  • NameServer:在系統中是做命名服務,更新和發現 broker服務。
  • Broker-Master:broker 消息主機服務器。
  • Broker-Slave: broker 消息從機服務器。
  • Producer: 消息生產者。
  • Consumer: 消息消費者。

說明: rocketmq系列都將會以rocketmq-4.1.0-incubating進行介紹。

在閱讀源碼時做了一定的注釋,公眾號【匠心零度】回復:rocketmq,可獲得基於rocketmq4.1.0加詳細中文代碼注釋 。歡迎大家 star、fork !

上篇RocketMQ(八):消息發送主要分析了下一般發送流程,本篇將會介紹下定時發送,順序發送,批量發送等情況 。

消息發送概述

上面的圖大概就是producer發送message到broker的核心邏輯了。

備注:本篇將會介紹下定時發送,順序發送,批量發送情況 ,這些情況說明都是站在producer角度進行說明,涉及到broker的內容會在分析broker相關內容的時候進行分析。

定時發送

何為定時消息

Producer 將消息發送到 MQ 服務端,但並不期望這條消息立馬投遞,而是延遲一定時間后才投遞到 Consumer 進行消費。

調用形式

與普通的調用發送基本沒有什么區別,唯一就是多了一個消息設置setDelayTimeLevel即可。代碼調用如圖:

那么這里的級別應該填寫什么呢?對應什么呢?--->這塊后續會分析,今天這里簡單說明下:

其他的就和普通發送沒有任何區別了,關鍵處理在broker,后續分析。

順序發送

何為順序發送

參考:https://help.aliyun.com/document_detail/49319.html?spm=a2c4g.11186623.2.3.21aKRd

調用形式

我們來看看具體內容實現調用就明白怎么回事了:

  • 獲取到所有可發送的信息

  • 發送內容

  • 用來區分(比如訂單號,這一類的訂單號是有順序的,但是和其他訂單號可以無序)

 sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

這里mqs就是獲取到所有可發送的信息,而orderId就是用來區分的,每次拿這個orderId和所有的進行取模(如果所有的沒有變化的情況下面)那么一種都是固定的那個隊列了(隊列就是先進先出了),后續消費在分析消費如何處理這塊。

備注:這里的順序也是相對的,如果添加了broker或者broker減少了,那么取模的信息勢必會不一致,所以需要明白下。

批量發送

何為批量發送

批量這個概念我相信大家一定都非常熟悉了,在很多調優的時候,比如數據庫批量處理,有些請求進行合並發送等都是類似實現,那么rocketmq批量發送也是為了追求性能,特別在數量量級特別大的時候,批量效果就非常明顯了。

備注:既然是批量就是等一批之后發送,那么實時性一定可能就稍微有點點延遲了(可以忽略,特別數據量多的情況)。我一般處理批量的就2個維度,達到一個就可以觸發,1.達到給定條數 2.達到給定時間(比如3s一次,如果這個時候數據量還是不夠也會進行發送)……

rocketmq只是提供接口,發送一批數據,那么何時發送這一批數據就是根據自己的選擇,就是上面說的兩條就是一般的做法:

  • 達到給定條數 。
  • 達到給定時間。
調用形式

rocketmq這里有一個好處,就是在批量發送的時候,會做一個簡單的轉換,減少網絡傳輸,學習下:

把這些轉化為a、b、c等形式。

我們來看看rocketmq給我們的批量發送的例子,為什么會給我們2個例子呢?需要思考!!!,下面我會來為大家講解下:

備注: 批量發送需要相同的topic以及相同的waitStoreMsgOK 和不支持定時發送。

Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support。

這種發送就是批量(這里的批量不是特別大),就自己發送走了。

這里的批量(批量的數據太多),所以進行分割之后發送了。最后調用的方法還是一樣的。

我們知道目前rocketmq默認支持4M發送消息內容(不管是一條還是批量),假如需要調整也可以調整,那么會一直沒有上限嗎?還有很多人說調整了10M為什么沒有生效(這里是需要Producer和服務端broker都調整才行,后續說明解釋,還有由於是基於netty網絡傳輸,可能你需要看看RocketMQ(二):RPC通訊,這里進行了說明)

這里決定了最大最大只能是16M,如果需要就需要修改了,一般也不建議修改,就和批量發送的第二個方式一樣就行,進行拆開批量發送就是了。


如果讀完覺得有收獲的話,歡迎點贊、關注、加公眾號【匠心零度】,查閱更多精彩歷史!!!

加入知識星球,一起探討!

img


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM