消息發送
Topic
Topic用於將消息按主題做划分,Producer將消息發往broker中指定的Topic,Consumer訂閱該Topic就可以收到這條消息。Topic跟發送方和消費方都沒有強關聯關系,發送方可以同時往多個Topic投放消息,消費方也可以訂閱多個Topic的消息。在RocketMQ中,Topic是一個上邏輯概念。消息存儲不會按Topic分開。舉個例子,現在你的訂單系統需要往MQ里發送訂單消息,那么此時你就應該建一個Topic,他的名字可以叫做:topic_order_info,也就是一個包含了訂單信息的數據集合。要是你有一些商品數據要發送消息到MQ里,你就應該創建一個Topic叫做“topic_product_info”,代表里面都是商品數據,那些想要從MQ里獲取商品數據的系統就可以從“topic_product_info”里獲取了。
MessageQueue,Topic,Broker的關系
我們要發送消息的時候,會根據業務指定一個topic,然后會指定這個topic對應了多少個隊列。其實rocketmq的隊列本質上就是一個數據分片的機制,隊列將一個topic拆分成很多個數據分片,然后每個broker機器上都存儲一些隊列。比如現在一個topic我們指定4個queue,我們知道topic數據是分布式存儲在多個broker中的。那差不多物理結構是這樣子:
消息發送與持久化過程
剛才的圖就是一個雙master的主從架構,那生產者發送消息的時候,怎么知道應該發送到哪個機器呢?其實生產者它首先會跟nameserver通信、獲取topic和broker信息,然后就按照一定的規則發送到某個broker里面去。
在rocketmq中每個messageQueue都會對應一個consumeQueue,因為消息最終是要寫入到磁盤文件(CommitLog)的,consumeQueue主要記錄了消息在文件中的offset偏移量(也可以理解成在文件中的下標位置)。為了達到近乎內存寫性能,Broker是基於OS操作系統的 PageCache 和 順序寫 兩個機制,來提升寫入效率。consumeQueue記錄下標位置后,首先寫入pageCache中,然后通過OS線程異步刷盤到CommitLog文件中;當然,與此相對應還有同步刷盤,就是consumeQueue直接寫入到CommitLog中。
* 異步刷盤時會有消息丟失風險,因為消息寫入PageCache后就返回ACK了;同步刷盤是寫到磁盤后才返回ACK。
DLedger主從同步原理
Broker實現高可用,至少要有一個Broker組,master接收消息后,slave去同步消息。也就是說同一條數據其實會有三份。DLedger來實現broker高可用,實際上就是由DLedger來代為管理broker的commitlog,然后根據Raft協議選舉出主從節點,一條消息發送過來,首先是標記為uncommitted狀態,過半節點同步消息后,則更新為commited狀態。
細嗅薔薇
Producer和Broker的長連接
Producer向Broker發送消息前,需要先建立長鏈接,Broker中會有個Reactor主線程專門監聽Producer建立連接的請求。然后Producer和Broker中會有一個SocketChannel,用來代表長連接它們建立好的長連接。
Reactor與Worker線程池
Producer會用SocketChannel向Broker發送消息,然后Broker中會有一個Reactor線程池,里面默認有3個線程,會去監聽SocketChannel中到達的消息。然后會將請求轉交給Worker線程池、默認8個線程,Worker來進行一系列的預處理,比如SSL加密驗證、編碼解碼、連接空閑檢查、網絡連接等。
最后再交給SendMessage線程池進行刷盤處理。
mmap+PageCache
我們普通的文件IO操作去進行磁盤文件的讀寫,那會存在多次數據拷貝性能問題。首先從磁盤上把數據讀取到內核IO緩沖區里去,然后再從內核IO緩存區里讀取到用戶進程私有空間里去,然后我們才能拿到這個文件里的數據。性能是很差的。
而RocketMQ就基於mmap技術+PageCache技術進行了優化。mmap是一種內存映射技術,就是把物理磁盤文件的地址和用戶進程私有空間的一些虛擬內存地址進行了一個映射。在上面我們說到過CommitLog磁盤文件寫入數據,文件大小默認1G那是因為mmap在文件映射時,一般有大小限制,在1.5GB~2GB之間。所以RocketMQ才讓CommitLog單個文件在1GB,ConsumeQueue文件在5.72MB,不會太大。
消息接收
消費組和topic的關系
我們初始化消費者的時候都會指定一個分組名,比如積分系統、倉儲系統什么的就可以起個名字:stock_consumer_group、wms_consumer_group,他們可以訂閱topic去讀取broker的消息。比如現在訂單系統下單后發送了一個消息,而這兩個系統都訂閱了這個消息的topic,那么他們則可以都拿到這個消息。但是比如積分系統里面部署了n台機器做集群,只有一台機器能拿到消息,同一個消費組的其他機器是無法獲取的。
集群模式和廣播模式
默認是集群模式,也就是說一個消費組獲取到一條消息,只會交給組內的一台機器去處理,但是我們可以通過如下設置來改變為廣播模式:consumer.setMessageModel(MessageModel.BROADCASTING);如果修改為廣播模式,那么消費組內每台機器都可以獲取到這條消息。不過這個播模式其實用的很少。消費者會比較均勻的消費每個MessageQueue的消息。如果過程中有機器新增和宕機,它們會立即重新分配MessageQueue。
Rocketmq消費模型
RocketMQ默認是采用pushConsumer方式消費的,從概念上來說是推送給消費者,它的本質是pull+長輪詢。這樣既通過長輪詢達到了push的實時性,又有了pull的可控性。系統收到消息后會自動處理消息和offset(消息偏移量),如果期間有新的consumer加入會自動做負載均衡(集群模式下offset存在broker中; 廣播模式下offset存在consumer里)。當然我們也可以設置為pullConsumer模式,這樣靈活性會提高,但是代碼卻會很復雜,需要手動維護offset,消息存儲和狀態。
Rocketmq消費策略
消費者從那個位置消費,分別為:
CONSUME_FROM_LAST_OFFSET: 第一次啟動從隊列最后位置消費,后續再啟動接着上次消費的進度開始消費
CONSUME_FROM_FIRST_OFFSET:第一次啟動從隊列初始位置消費,后續再啟動接着上次消費的進度開始消費
CONSUME_FROM_TIMESTAMP: 第一次啟動從指定時間點位置消費,后續再啟動接着上次消費的進度開始消費
* 以上所說的第一次啟動是指從來沒有消費過的消費者,如果該消費者消費過,那么會在broker端記錄該消費者的消費位置,如果該消費者掛了再啟動,那么自動從上次消費的進度開始。一般來說,我們都會選擇CONSUME_FROM_FIRST_OFFSET,這樣你剛開始就從Topic的第一條消息開始消費。
。