匠心零度 轉載請注明原創出處,謝謝!
RocketMQ網絡部署圖
- NameServer:在系統中是做命名服務,更新和發現 broker服務。
- Broker-Master:broker 消息主機服務器。
- Broker-Slave: broker 消息從機服務器。
- Producer: 消息生產者。
- Consumer: 消息消費者。
說明: rocketmq系列都將會以rocketmq-4.1.0-incubating進行介紹。
在閱讀源碼時做了一定的注釋,公眾號【匠心零度】回復:rocketmq,可獲得基於rocketmq4.1.0加詳細中文代碼注釋 。歡迎大家 star、fork !
上篇RocketMQ(八):消息發送主要分析了下一般發送流程,本篇將會介紹下定時發送,順序發送,批量發送等情況 。
消息發送概述
上面的圖大概就是producer發送message到broker的核心邏輯了。
備注:本篇將會介紹下定時發送,順序發送,批量發送情況 ,這些情況說明都是站在producer角度進行說明,涉及到broker的內容會在分析broker相關內容的時候進行分析。
定時發送
何為定時消息
Producer 將消息發送到 MQ 服務端,但並不期望這條消息立馬投遞,而是延遲一定時間后才投遞到 Consumer 進行消費。
調用形式
與普通的調用發送基本沒有什么區別,唯一就是多了一個消息設置setDelayTimeLevel即可。代碼調用如圖:
那么這里的級別應該填寫什么呢?對應什么呢?--->這塊后續會分析,今天這里簡單說明下:
其他的就和普通發送沒有任何區別了,關鍵處理在broker,后續分析。
順序發送
何為順序發送
參考:https://help.aliyun.com/document_detail/49319.html?spm=a2c4g.11186623.2.3.21aKRd
調用形式
我們來看看具體內容實現調用就明白怎么回事了:
-
獲取到所有可發送的信息
-
發送內容
-
用來區分(比如訂單號,這一類的訂單號是有順序的,但是和其他訂單號可以無序)
sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
這里mqs就是獲取到所有可發送的信息,而orderId就是用來區分的,每次拿這個orderId和所有的進行取模(如果所有的沒有變化的情況下面)那么一種都是固定的那個隊列了(隊列就是先進先出了),后續消費在分析消費如何處理這塊。
備注:這里的順序也是相對的,如果添加了broker或者broker減少了,那么取模的信息勢必會不一致,所以需要明白下。
批量發送
何為批量發送
批量這個概念我相信大家一定都非常熟悉了,在很多調優的時候,比如數據庫批量處理,有些請求進行合並發送等都是類似實現,那么rocketmq批量發送也是為了追求性能,特別在數量量級特別大的時候,批量效果就非常明顯了。
備注:既然是批量就是等一批之后發送,那么實時性一定可能就稍微有點點延遲了(可以忽略,特別數據量多的情況)。我一般處理批量的就2個維度,達到一個就可以觸發,1.達到給定條數 2.達到給定時間(比如3s一次,如果這個時候數據量還是不夠也會進行發送)……
rocketmq只是提供接口,發送一批數據,那么何時發送這一批數據就是根據自己的選擇,就是上面說的兩條就是一般的做法:
- 達到給定條數 。
- 達到給定時間。
調用形式
rocketmq這里有一個好處,就是在批量發送的時候,會做一個簡單的轉換,減少網絡傳輸,學習下:
把這些轉化為a、b、c等形式。
我們來看看rocketmq給我們的批量發送的例子,為什么會給我們2個例子呢?需要思考!!!,下面我會來為大家講解下:
備注: 批量發送需要相同的topic以及相同的waitStoreMsgOK 和不支持定時發送。
Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support。
這種發送就是批量(這里的批量不是特別大),就自己發送走了。
這里的批量(批量的數據太多),所以進行分割之后發送了。最后調用的方法還是一樣的。
我們知道目前rocketmq默認支持4M發送消息內容(不管是一條還是批量),假如需要調整也可以調整,那么會一直沒有上限嗎?還有很多人說調整了10M為什么沒有生效(這里是需要Producer和服務端broker都調整才行,后續說明解釋,還有由於是基於netty網絡傳輸,可能你需要看看RocketMQ(二):RPC通訊,這里進行了說明)
這里決定了最大最大只能是16M,如果需要就需要修改了,一般也不建議修改,就和批量發送的第二個方式一樣就行,進行拆開批量發送就是了。
如果讀完覺得有收獲的話,歡迎點贊、關注、加公眾號【匠心零度】,查閱更多精彩歷史!!!
加入知識星球,一起探討!