RocketMQ 解析


1、RocketMQ網絡部署圖

 

 

  • RocketMQ的幾個關鍵角色和配置
  • Producer

兩種消息發送方式:

 

1.同步發送:發送成功后繼續執行代碼邏輯。

 

2.異步發送:發送后,不管成功失敗執行代碼邏輯。成功后調用回調方法。

 

  • Broker

兩種刷盤方式,flushDiskType配置,SYNC_FLUSH,ASYNC_FLUSH。

 

1.同步刷盤方式:消息寫入磁盤后再返回成功狀態。

 

2.異步刷盤方式:消息寫入內存后就返回成功狀態。

 

兩種復制方式,表示消息從Master復制到Slave的方式,brokerRole,ASYNC_MASTER(異步master),SYNC_MASTER(同步master),SLAVE(slave)。

 

1.同步復制方式:等Master和Slave都寫入成功后才返回寫入成功。

 

2.異步復制方式:Master寫入成功后就返回寫入成功。

 

  • Consumer

三種消費方式 :

 

1.Push(服務端主動推送消息),RocketMQ服務器收到消息后自動調用消費者函數來處理消息,自動維護Offset。支持兩種消息模式,Clustering模式,同一個ConsumerGroup的每個Consumer消費訂閱消息的一部分內容,broadcasting模式,同一個ConsumerGroup的每個Consumer都消費所訂閱的的全部消息。

 

2.Pull (客戶端主動拉取消息),Client端循環從Server端拉取消息。需要客戶端自己維護Offset。

 

3.長輪詢消費方式,Client發送消息請求,Server端接受請求,如果發現Server隊列里沒有新消息,Server端不立即返回,而是持有這個請求一段時間(通過設置超時時間來實現),在這段時間內輪詢Server隊列內是否有新的消息,如果有新消息,就利用現有的連接返回消息給消費者;如果這段時間內沒有新消息進入隊列,則返回空。

 

深入了解了上面三個角色,我們來總結下雙master,雙slave模式下的整個發送,消費流程。生產者發送消息,消息會負載均衡到兩個Master上,如果master的刷盤方式是同步刷盤方式,復制方式是同步復制方式,需要消息寫到master和slave的硬盤上后,服務器才會放回發送消息成功。消息存儲到服務器后,消費者根據自己的消費方式來消費消息,如果是Push,消息到達服務器后馬上推送消息到消費者,如果是pull,消費拉取消息后再消費。

 

 

1.1 RocketMQ網絡部署特點:

  • NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。相對來說,nameserver的穩定性非常高。原因有二:

1)nameserver互相獨立,彼此沒有通信關系,單台nameserver掛掉,不影響其他nameserver,即使全部掛掉,也不影響業務系統使用。無狀態
2)nameserver不會有頻繁的讀寫,所以性能開銷非常小,穩定性很高。
  • Broker部署相對復雜,Broker四種集群方式

Broker分為Master與Slave(Slave不可寫,但可讀,類似於MySQL的主備方式),一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有 NameServer。

1)單個master:這是一種風險比較大的集群方式,因為一旦Borker重啟或宕機期間,將會導致這個服務不可用,因此是不建議線上環境去使用的。
2)多個master:
一個集群全部都是Master,沒有Slave。

  優點:配置簡單,單個Master宕機或者是重啟維護對應用沒有什么影響的,在磁盤配置為RAID10時,即使機器宕機不可恢復的情況下,消息也不會丟失(異步刷盤丟失少量消息,同步刷盤則是一條都不會丟失),性能最高

  缺點:當單個Broker宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息的實時性會受到影響

3)多master多salve異步復制,每個Master配置一個Slave,有多對的Master-Slave,HA采用的是異步復制方式,主備有短暫的消息延遲,毫秒級別的(Master收到消息之后立刻向應用返回成功標識,同時向Slave寫入消息)。

  優點:即使是磁盤損壞了,消息丟失的非常少,且消息實時性不會受到影響,因為Master宕機之后,消費者仍然可以從Slave消費,此過程對應用透明,不需要人工干預,性能同多個Master模式機會一樣。

  缺點:Master宕機,磁盤損壞的情況下,會丟失少量的消息

4)多master多salve同步雙寫,每個Master配置一個Slave,有多對的Master-Slave,HA采用的是同步雙寫模式,主備都寫成功,才會向應用返回成功。

  優點:數據與服務都無單點,Master宕機的情況下,消息無延遲,服務可用性與數據可用性都非常高

  缺點:性能比異步復制模式略低,大約低10%左右,發送單個Master的RT會略高,目前主機宕機后,Slave不能自動切換為主機,后續會支持自動切換功能。

  • Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
  • Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。

1.2 安裝及使用步驟:

1、下載並安裝
  根目錄執行 mvn clean package -Dmaven.test.skip=true或mvn -Preplease-all -DskipTests clean install -U
  並在distribution/target/apache-rocketmq 目錄下找到打好的包.解壓至指定目錄
 
2、rocketmq的啟動
  啟動namesrv服務:nohup sh bin/mqnamesrv &  查看日志:tail -f ~/logs/rocketmqlogs/namesrv.log
  啟動broker服務:nohup sh bin/mqbroker &   查看日志:tail -f ~/logs/rocketmqlogs/broker.log
  nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true & 默認自動創建topic,否則報錯no route info of this topic
 
3、rocketmq服務關閉
  關閉namesrv服務:sh bin/mqshutdown namesrv
  關閉broker服務 :sh bin/mqshutdown broker

 2.  RocketMQ配置文件 - broker

眾所周知,RocketMQ有多種集群部署方式,它們的配置文件也是分開的,在安裝包conf目錄下有官方自帶配置文件模版自上而下依次是:兩主兩從異步,兩主兩從同步,兩主

broker.conf,這個是相當於配置的簡單模板,另外其他的配置也可以到上面說的2m-2s那些目錄中去參考一下,這個broker.conf是不能直接使用的,因為broker啟動的時候用-c參數傳入配置文件,這里只認識*.properties的配置文件,所以這里應該分別執行:

cp broker.conf broker1.properties
cp broker1.properties broker2.properties

如下:

說明:

  • 2m-noslave: 多Master模式
  • 2m-2s-sync: 多Master多Slave模式,同步雙寫
  • 2m-2s-async:多Master多Slave模式,異步復制

其中namesrvAddr:主機地址,brokerClusterName:集群名稱,brokerName :分片名稱 ,deleteWhen=04:刪除文件時間點,默認是凌晨4點
,fileReservedTime=120:文件保留時間,默認48小時,brokerId:分片id編號 ;brokerRole分片角色。

注意:其中主從之間的分片名稱相同。主從區分是brokerId 主 0,從 1。brokerRole 主MASTER從SLAVE。

 

  • RocketMQ默認提供的配置文件都是最基本的,很多配置都是默認值,生產環境中需要根據實際情況進行修改。
  • #所屬集群名字 brokerClusterName=rocketmq-cluster
  • #broker名字,注意此處不同的配置文件填寫的不一樣 brokerName=broker-a|broker-b
  • #0表示Master,>0表示Slave brokerId=0 #nameServer地址,分號分割 namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
  • #在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4
  • #是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true
  • #是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true
  • #Broker 對外服務的監聽端口 listenPort=10911
  • #刪除文件時間點,默認凌晨 4點 deleteWhen=04
  • #文件保留時間,默認 48 小時 fileReservedTime=120
  • #commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824
  • #ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000
  • #destroyMapedFileIntervalForcibly=120000
  • #redeleteHangedFileInterval=120000
  • #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88
  • #存儲路徑 storePathRootDir=/usr/local/alibaba-rocketmq/store
  • #commitLog 存儲路徑 storePathCommitLog=/usr/local/alibaba-rocketmq/store/commitlog
  • #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/alibaba-rocketmq/store/consumequeue
  • #消息索引存儲路徑 storePathIndex=/usr/local/alibaba-rocketmq/store/index
  • #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/alibaba-rocketmq/store/checkpoint
  • #abort 文件存儲路徑 abortFile=/usr/local/alibaba-rocketmq/store/abort
  • #限制的消息大小 maxMessageSize=65536
  • #flushCommitLogLeastPages=4
  • #flushConsumeQueueLeastPages=2
  • #flushCommitLogThoroughInterval=10000
  • #flushConsumeQueueThoroughInterval=60000
  • #Broker 的角色

    - ASYNC_MASTER 異步復制Master

    - SYNC_MASTER 同步雙寫Master

    - SLAVE brokerRole=ASYNC_MASTER

  • 刷盤方式 #- ASYNC_FLUSH
  • 異步刷盤 #- SYNC_FLUSH
  • 同步刷盤 flushDiskType=ASYNC_FLUSH
  • #checkTransactionMessageEnable=false
  • #發消息線程池數量 #sendMessageThreadPoolNums=128
  • #拉消息線程池數量 #pullMessageThreadPoolNums=128

 

  • Broker向namesrv注冊
1.      獲取namesrv的地址列表(是亂序的)

2.      遍歷向每個namesrv注冊topic的配置信息topicconfig
 
        

  •  Topic在broker文件上的存儲json格式

  Namesrv接收Broker注冊的topic信息, namesrv只存內存,但是broker有任務定時推送

  1.   接收數據向RouteInfoManager注冊。

    Broker初始化加載本地配置,配置信息是以json格式存儲在本地, rocketmq強依賴fastjson作轉換, RocketMq通過ConfigMananger來管理配置加載以及持久化

 

復制代碼
 
1.      加載topic配置${user.home}/store/config/topics.json
{
"dataVersion":{
           "counter":2,
           "timestatmp":1393729865073
},
"topicConfigTable":{
           //根據consumer的group生成的重試topic
           "%RETRY% group_name":{
                    "perm":6,
                    "readQueueNums":1,
                    "topicFilterType":"SINGLE_TAG",
                    "topicName":"%RETRY%group_name",
                    "writeQueueNums":1
           },

        "TopicTest":{
            "perm":6, // 100讀權限 , 10寫權限 6是110讀寫權限 "readQueueNums":8, "topicFilterType":"SINGLE_TAG", "topicName":"TopicTest", "writeQueueNums":8 } } }

2.加載消費進度偏移量  ${user.home}/store/config/consumerOffset.json
{

"offsetTable":{
       //重試隊列消費進度為零
      "%RETRY% group_name@group_name":{0:0}, 
      //分組名group_name消費topic為TopicTest的進度為:
      // 隊列queue=0  消費進度23
      // 隊列 queue=2  消費進度為22  等等…

       "TopicTest@ group_name":{0:23,1:23,2:22,3:22,4:21,5:18,6:18,7:18}
}
}
 
3. 加載消費者訂閱關系 ${user.home}/store/config/subscriptionGroup.json
{

         "dataVersion":{
                   "counter":1,
                   "timestatmp":1393641744664
         },
         "group_name":{
                            "brokerId":0,  //0代表這台broker機器為master,若要設為slave值大於0
                            "consumeBroadcastEnable":true,
                            "consumeEnable":true,
                            "consumeFromMinEnable":true,
                            "groupName":"group_name",
                            "retryMaxTimes":5,
                            "retryQueueNums":1,
                            "whichBrokerWhenConsumeSlowly":1
                   }
         }
}
 
復制代碼

 

 2.1 broker的消息存儲

 存儲特點:

如上圖所示:
(1)消息主體以及元數據都存儲在**CommitLog**當中
(2)Consume Queue相當於kafka中的partition,是一個邏輯隊列,存儲了這個Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。 (3)每次讀取消息隊列先讀取consumerQueue,然后再通過consumerQueue去commitLog中拿到消息主體。

Rocketmq的消息的存儲是由consume queue和 commitLog 配合完成的。

ConsumeQueue是定長的結構,每1條記錄固定的20個字節。

Consumer消費消息的時候,要讀2次:先讀ConsumeQueue得到offset,再讀CommitLog得到消息內容。

2.1.1  CommitLog文件(物理隊列

CommitLog是用於存儲真實的物理消息的結構,保存消息元數據,所有消息到達Broker后都會保存到commitLog文件,這里需要強調的是所有topic的消息都會統一保存在commitLog中。
舉個例子:當前集群有TopicA, TopicB,
這兩個Toipc的消息會按照消息到達的先后順序保存到同一個commitLog中,而不是每個Topic有自己獨立的commitLog
onsumeQueue是邏輯隊列,僅僅存儲了CommitLog的位移而已,真實的存儲都在本結構中。
首先這里會使用CommitLog.this.topicQueueTable.put(key, queueOffset),其中的key是 topic-queueId, queueOffset是當前這個key中的消息數,每增加一個消息增加一(不會自減);
這里queueOffset的用途如下:每次用戶請求putMessage的時候,將queueOffset返回給客戶端使用,這里的queueoffset表示邏輯上的隊列偏移。

消息存放物理文件,每台broker上的commitLog被本機器所有queue共享不做區分
  • commitlog文件的存儲地址:$HOME\store\commitlog\${fileName}
  • 一個消息存儲單元長度是不定的,順序寫但是隨機讀
  • 每個commitLog文件的默認大小為 1G =1024*1024*1024,滿1G之后會自動新建CommitLog文件做保存數據用
  • commitlog的文件名fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當這個文件滿了,

    第二個文件名字為00000000001073741824,起始偏移量為1073741824,以此類推,第三個文件名字為00000000002147483648,起始偏移量為2147483648消息存儲的時候會順序寫入文件,

    當文件滿了,寫入下一個文件。

 

  CommitLog的清理機制:

 

  • 按時間清理,rocketmq默認會清理3天前的commitLog文件;
  • 按磁盤水位清理:當磁盤使用量到達磁盤容量75%,開始清理最老的commitLog文件。

 

 
             

1)、CommitLog 文件生成規則

偏移量:每個 CommitLog 文件的大小為 1G,一般情況下第一個 CommitLog 的起始偏移量為 0,第二個 CommitLog 的起始偏移量為 1073741824 (1G = 1073741824byte)。

2)、怎么知道消息存儲在哪個 CommitLog 文件上?

假設 1073742827 為物理偏移量(物理偏移量也即全局偏移量),則其對應的相對偏移量為 1003(1003 = 1073742827 - 1073741824),並且該偏移量位於第二個 CommitLog。

index 和 ComsumerQueue 中都有消息對應的物理偏移量,通過物理偏移量就可以計算出該消息位於哪個 CommitLog 文件上。

復制代碼
文件地址:${user.home} \store\${commitlog}\${fileName}

消息存儲結構:
  flag  這個標志值rocketmq不做處理,只存儲后透傳
  QUEUEOFFSET這個值是個自增值不是真正的consume queue的偏移量,可以代表這個隊列中消息的個數,要通過這個值查找到consume queue中數據,QUEUEOFFSET * 20才是偏移地址
  PHYSICALOFFSET 代表消息在commitLog中的物理起始地址偏移量
  SYSFLAG消息標志,指明消息是事物事物狀態等等消息特征
  BORNTIMESTAMP 消息產生端(producer)的時間戳
  BORNHOST     消息產生端(producer)地址(address:port)
  STORETIMESTAMP 消息在broker存儲時間
  STOREHOSTADDRESS 消息存儲到broker的地址(address:port)
  RECONSUMETIMES消息被某個訂閱組重新消費了幾次(訂閱組之間獨立計數),因為重試消息發送到了topic名字為%retry%groupName的隊列queueId=0的隊列中去了
  Prepared Transaction Offset 表示是prepared狀態的事物消息
復制代碼

2.1.2 ConsumeQueue文件組織:

ConsumerQueue相當於CommitLog的索引文件,消費者消費時會先從ConsumerQueue中查找消息的在commitLog中的offset,再去CommitLog中找元數據。

如果某個消息只在CommitLog中有數據,沒在ConsumerQueue中, 則消費者無法消費,Rocktet的事務消息就是這個原理

Consumequeue類對應的是每個topic和queuId下面的所有文件,相當於字典的目錄用來指定消息在消息的真正的物理文件commitLog上的位置

每條數據的結構如下圖所示:

消息的起始物理偏移量physical offset(long 8字節)+消息大小size(int 4字節)+tagsCode(long 8字節)。

  • 每個topic下的每個queue都有一個對應的consumequeue文件。
  • 文件默認存儲路徑:${user.home} \store\consumequeue\${topicName}\${queueId}\${fileName}
  • 每個文件由30W條數據組成,每條數據的大小為20個字節,從而每個文件的默認大小為600萬個字節(consume queue中存儲單元是一個20字節定長的數據)是順序寫順序讀
  • commitLogOffset是指這條消息在commitLog文件實際偏移量
  • size就是指消息大小
  • 消息tag的哈希值

 

ConsumeQueue幾個重要的字段
  private final String topic; private final int queueId;//隊列id private final ByteBuffer byteBufferIndex;// 寫索引時用到的ByteBuffer private long maxPhysicOffset = -1;// 最后一個消息對應的物理Offset

 

每個cosumequeue文件的名稱fileName,名字長度為20位,左邊補零,剩余為起始偏移量;

比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為600W,

當第一個文件滿之后創建的第二個文件的名字為00000000000006000000,起始偏移量為6000000,以此類推,

第三個文件名字為00000000000012000000,起始偏移量為12000000,消息存儲的時候會順序寫入文件,當文件滿了,寫入下一個文件。

  • topic queueId來組織的:比如TopicA配了讀寫隊列0、1,那么TopicA和Queue=0組成一個ConsumeQueue, TopicA和Queue=1組成一個另一個ConsumeQueue.
  • 按消費端group分組重試隊列,如果消費端消費失敗,發送到retry消費隊列中
  • 按消費端group分組死信隊列,如果消費端重試超過指定次數,發送死信隊列
  • 每個ConsumeQueue可以由多個文件組成無限隊列被MapedFileQueue對象管理

 2.1.3  MapedFile 是PageCache文件封裝

操作物理文件在內存中的映射以及將內存數據持久化到物理文件中,代碼中寫死了要求os系統的頁大小為4k, 消息刷盤根據參數(commitLog默認至少刷4頁, consumeQueue默認至少刷2頁)才刷 

以下io對象構建了物理文件映射內存的對象
FileChannel fileChannel = new RandomAccessFile(file,“rw”).getChannel();
MappedByteBuffer mappedByteBuffer=fileChannel.map(READE_WRITE,0,fileSize);
構建mapedFile對象需要兩個參數
fileSize: 映射的物理文件的大小
commitLog每個文件的大小默認1G =1024*1024*1024
ConsumeQueue每個文件默認存30W條 = 300000 *CQStoreUnitSize(每條大小)
filename: filename文件名稱但不僅僅是名稱還表示文件記錄的初始偏移量, 文件名其實是個long類型的值

2.1.4  MapedFileQueue 存儲隊列,數據定時刪除,無限增長。

隊列有多個文件(MapedFile)組成,由集合對象List表示升序排列,前面講到文件名即是消息在此文件的中初始偏移量,排好序后組成了一個連續的消息隊     

 當消息到達broker時,需要獲取最新的MapedFile寫入數據,調用MapedFileQueue的getLastMapedFile獲取,此函數如果集合中一個也沒有創建一個,如果最后一個寫滿了也創建一個新的。
 MapedFileQueue在獲取getLastMapedFile時,如果需要創建新的MapedFile會計算出下一個MapedFile文件地址,通過預分配服務AllocateMapedFileService異步預創建下一個MapedFile文件,這樣下次創建新文件請求就不要等待,因為創建文件特別是一個1G的文件還是有點耗時的,
 getMinOffset獲取隊列消息最少偏移量,即第一個文件的文件起始偏移量
 getMaxOffset獲取隊列目前寫到位置偏移量
 getCommitWhere刷盤刷到哪里了

 

2.1.5 消息存儲及消費過程

1)消息發送流程:

  • Broker啟動時,向NameServer注冊信息
  • 客戶端調用producer發送消息時,會先從NameServer獲取該topic的路由信息。消息頭code為GET_ROUTEINFO_BY_TOPIC
  • 從NameServer返回的路由信息,包括topic包含的隊列列表和broker列表
  • Producer端根據查詢策略,選出其中一個隊列,用於后續存儲消息
  • 每條消息會生成一個唯一id,添加到消息的屬性中。屬性的key為UNIQ_KEY
  • 對消息做一些特殊處理,比如:超過4M會對消息進行壓縮
  • producer向Broker發送rpc請求,將消息保存到broker端。消息頭的code為SEND_MESSAGE或SEND_MESSAGE_V2(配置文件設置了特殊標志)

消息存儲流程

  • Broker端收到消息后,將消息原始信息保存在CommitLog文件對應的MappedFile中,然后異步刷新到磁盤
  • ReputMessageServie線程異步的將CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中
  • ConsumerQueue和IndexFile只是原始文件的索引信息

1)消息消費過程:

現在我們再來看 Broker 服務器端。首先我們應該知道,消息往 Broker 存儲就是在向 CommitLog 消息文件中寫入數據的一個過程。

在 Broker 啟動過程中,其會啟動一個叫做 ReputMessageService 的服務,這個服務每隔 1 秒會檢查一下這個 CommitLog 是否有新的數據寫入。

ReputMessageService 自身維護了一個偏移量 reputFromOffset,用以對比和 CommitLog 文件中的消息總偏移量的差距。

當這兩個偏移量不同的時候,就代表有新的消息到來了,在有新的消息到來之后,doReput() 函數會取出新到來的所有消息,每一條消息都會封裝為一個 DispatchRequest 請求,

進而將這條請求分發給不同的請求消費者,我們在這篇文章中只會關注利用消息創建消費隊列的服務 CommitLogDispatcherBuildConsumeQueue,

CommitLogDispatcherBuildConsumeQueue 服務會根據這條請求按照不同的隊列 ID 創建不同的消費隊列文件,並在內存中維護一份消費隊列列表。

然后將 DispatchRequest 請求中這條消息的消息偏移量、消息大小以及消息在發送時候附帶的標簽的 Hash 值寫入到相應的消費隊列文件中去。

3)客戶端如何記錄自己所消費的隊列消費到哪里了呢?

答案就是:消費隊列偏移量

集群模式:由於每個客戶端所消費的消息隊列不同,所以每個消息隊列已經消費到哪里的消費偏移量是記錄在 Broker 服務器端的。

廣播模式:由於每個客戶端分配消費這個話題的所有消息隊列,所以每個消息隊列已經消費到哪里的消費偏移量是記錄在客戶端本地的。

(1) 集群模式

在集群模式下,消費者客戶端在內存中維護了一個 offsetTable 表,同樣在 Broker 服務器端也維護了一個偏移量表,在消費者客戶端,RebalanceService 服務會定時地 (默認 20 秒) 從 Broker 服務器獲取當前客戶端所需要消費的消息隊列,並與當前消費者客戶端的消費隊列進行對比,看是否有變化。對於每個消費隊列,會從 Broker 服務器查詢這個隊列當前的消費偏移量。然后根據這幾個消費隊列,創建對應的拉取請求 PullRequest 准備從 Broker 服務器拉取消息,當從 Broker 服務器拉取下來消息以后,只有當用戶成功消費的時候,才會更新本地的偏移量表。本地的偏移量表再通過定時服務每隔 5 秒同步到 Broker 服務器端,而維護在 Broker 服務器端的偏移量表也會每隔 5 秒鍾序列化到磁盤中(文件地址:${user.home} /store/config/consume/consumerOffset.json)

保存的格式如下所示:

broker_offset_table

 

(2) 廣播模式

對於廣播模式而言,每個消費隊列的偏移量肯定不能存儲在 Broker 服務器端,因為多個消費者對於同一個隊列的消費可能不一致,偏移量會互相覆蓋掉。因此,在廣播模式下,每個客戶端的消費偏移量是存儲在本地的,然后每隔 5 秒將內存中的 offsetTable 持久化到磁盤中。當首次從服務器獲取可消費隊列的時候,偏移量不像集群模式下是從 Broker 服務器讀取的,而是直接從本地文件中讀取

這里提一下,在廣播模式下,消息隊列的偏移量默認放在用戶目錄下的 .rocketmq_offsets 目錄下

存儲格式如下:

broadcasting_offset_table_persist

 

3. load、recover

Broker啟動的時候需要加載一系列的配置,啟動一系列的任務,主要分布在BrokerController 的initialize()和start()方法中

復制代碼
1.加載topic配置
2.加載消費進度consumer offset
3.加載消費者訂閱關系consumer subscription
4.加載本地消息messageStore.load()
  Load 定時進度,Load commit log,commitLog其實調用存儲消費隊列mapedFileQueue.load()方法來加載的。
  遍歷出${user.home} \store\${commitlog}目錄下所有commitLog文件,按文件名(文件名就是文件的初始偏移量)升序排一下, 每個文件構建一個MapedFile對象, 在MapedFileQueue中用集合list把這些MapedFile文件組成一個邏輯上連續的隊列

  Load consume Queue
  遍歷${user.home} \store\consumequeue下的所有文件夾(每個topic就是一個文件夾)
  遍歷${user.home} \store\consumequeue\${topic}下的所有文件夾(每個queueId就是一個文件夾)
  遍歷${user.home} \store\consumequeue\${topic}\${queueId}下所有文件,根據topic, queueId, 文件來構建ConsueQueue對象
  DefaultMessageStore中存儲結構Map<topic,Map<queueId, CosnueQueue>>
  每個Consumequeue利用MapedFileQueue把mapedFile組成一個邏輯上連續的隊列

  加載事物模塊
  加載存儲檢查點
  加載${user.home} \store\checkpoint 這個文件存儲了3個long類型的值來記錄存儲模型最終一致的時間點,這個3個long的值為
  physicMsgTimestamp為commitLog最后刷盤的時間
  logicMsgTimestamp為consumeQueue最終刷盤的時間
  indexMsgTimestamp為索引最終刷盤時間
  checkpoint作用是當異常恢復時需要根據checkpoint點來恢復消息

  加載索引服務indexService
  recover嘗試數據恢復
  判斷是否是正常恢復,系統啟動的啟動存儲服務(DefaultMessageStore)的時候會創建一個臨時文件abort, 當系統正常關閉的時候會把這個文件刪掉,這個類似在Linux下打開vi編輯器生成那個臨時文件,所有當這個abort文件存在,系統認為是異常恢復 
復制代碼

 

復制代碼
1)  先按照正常流程恢復ConsumeQueue

什么是恢復ConsumeQueue, 前面不是有步驟load了ConsumeQueue嗎,為什么還要恢復?

前面load步驟創建了MapedFile對象建立了文件的內存映射,但是數據是否正確,現在文件寫到哪了(wrotePosition),
Flush到了什么位置(committedPosition)?恢復數據來幫我解決這些問題。 每個ConsumeQueue的mapedFiles集合中,從倒數第三個文件開始恢復(為什么只恢復倒數三個文件,看消息消費程度),
因為consumequeue的存儲單元是20字節的定長數據,所以是依次分別取了 Offset long類型存儲了commitLog的數據偏移量 Size int類型存儲了在commitLog上消息大小 tagcode tag的哈希值 目前rocketmq判斷存儲的consumequeue數據是否有效的方式為判斷offset>= 0 && size > 0 如果數據有效讀取下20個字節判斷是否有效 如果數據無效跳出循環,記錄此時有效數據的偏移量processOffset 如果讀到文件尾,讀取下一個文件 proccessOffset是有效數據的偏移量,獲取這個值的作用什么? (1)proccessOffset后面的數據屬於臟數據,后面的文件要刪除掉 (2)設置proccessOffset所在文件MapedFile的wrotePosition和commitedPosition值,值為 proccessOffset%mapedFileSize 2正常恢復commitLog文件 步驟跟流程恢復Consume Queue 判斷消息有效, 根據消息的存儲格式讀取消息到DispatchRequest對象,獲取消息大小值msgSize   大於 0 正常數據   等於-1 文件讀取錯誤 恢復結束   等於0 讀到文件末尾 3) 異常數據恢復,OSCRASH或者JVM CRASH或者機器掉電 當${user.home}\store\abort文件存在,代表異常恢復 讀取${user.home} \store\checkpoint獲取最終一致的時間點 判斷最終一致的點所在的文件是哪個 從最新的mapedFile開始,獲取存儲的一條消息在broker的生成時間,大於checkpoint時間點的放棄找前一個文件,小於等於checkpoint時間點的說明checkpoint
在此mapedfile文件中 從checkpoint所在mapedFile開始恢復數據,它的整體過程跟正常恢復commitlog類似,最重要的區別在於 (1)讀取消息后派送到分發消息服務DispatchMessageService中,來重建ConsumeQueue以及索引 (2)根據恢復的物理offset,清除ConsumeQueue多余的數據 4)恢復TopicQueueTable=Map<topic-queueid,offset> (1)恢復寫入消息時,消費記錄隊列的offset (2)恢復每個隊列的最小offset 初始化通信層 初始化線程池 注冊broker端處理器用來接收client請求后選擇處理器處理 啟動每天凌晨00:00:00統計消費量任務 啟動定時刷消費進度任務 啟動掃描數據被刪除了的topic,offset記錄也對應刪除任務 如果namesrv地址不是指定的,而是從靜態服務器取的,啟動定時向靜態服務器獲取namesrv地址的任務 如果broker是master,啟動任務打印slave落后master沒有同步的bytes 如果broker是slave,啟動任務定時到mastser同步配置信息
復制代碼

 

3. master slave

在broker啟動的時候BrokerController如果是slave,配置了master地址更新,沒有配置所有broker會想namesrv注冊,從namesrv獲取haServerAddr,然后更新到HAClient

當HAClient的MasterAddress不為空的時候(因為broker  master和slave都構建了HAClient)會主動連接master獲取SocketChannel Master監聽Slave請求的端口,默認為服務端口+1

接收slave上傳的offset long類型 int pos = this.byteBufferRead.position() -(this.byteBufferRead.position() % 8) 

//沒有理解意圖

 long readOffset =this.byteBufferRead.getLong(pos - 8);  this.processPostion = pos;

主從復制從哪里開始復制:如果請求時0 ,從最后一個文件開始復制

Slave啟動的時候brokerController開啟定時任務定時拷貝master的配置信息

復制代碼
SlaveSynchronize類代表slave從master同步信息(非消息)

         syncTopicConfig       同步topic的配置信息

         syncConsumerOffset       同步消費進度

         syncDelayOffset                同步定時進度

         syncSubcriptionGroupConfig  同步訂閱組配7F6E 
復制代碼

HaService類實現了HA服務,負責同步雙寫,異步復制功能, 這個類master和slave的broker都會實例化,

Master通過AcceptSocketService監聽slave的連接,每個masterslave連接都會構建一個HAConnection對象搭建他們之間的橋梁,對於一個master多slave部署結構的會有多個HAConnection實例,

Master構建HAConnection時會構建向slave寫入數據服務線程對象WriteSocketService對象和讀取Slave反饋服務線程對象ReadSocketService

WriteSocketService

向slave同步commitLog數據線程,

slaveRequestOffset是每次slave同步完數據都會向master發送一個ack表示下次同步的數據的offset。

如果slave是第一次啟動的話slaveRequestOffset=0, master會從最近那個commitLog文件開始同步。(如果要把master上的所有commitLog文件同步到slave的話, 把masterOffset值賦為minOffset)

向socket寫入同步數據: 傳輸數據協議<Phy Offset> <Body Size> <Body Data>

ReadSocketService:

4 ReadSocketService

復制代碼
   讀取slave通過HAClient向master返回同步commitLog的物理偏移量phyOffset值

         通知前端線程,如果是同步復制的話通知是否復制成功

 

Slave 通過HAClient建立與master的連接, 

來定時匯報slave最大物理offset,默認5秒匯報一次也代表了跟master之間的心跳檢測

讀取master向slave寫入commitlog的數據, master向slave寫入數據的格式是
復制代碼

復制代碼
Slave初始化DefaultMessageStore時候會構建ReputMessageService服務線程並在啟動存儲服務的start方法中被啟動

ReputMessageService的作用是slave從物理隊列(由commitlog文件構成的MapedFileQueue)加載數據,並分發到各個邏輯隊列

HA同步復制, 當msg寫入master的commitlog文件后,判斷maser的角色如果是同步雙寫SYNC_MASTER, 等待master同步到slave在返回結果
復制代碼

5 HA異步復制

6.索引服務

1索引結構

IndexFile 存儲具體消息索引的文件,文件的內容結構如圖:

 

索引文件由索引文件頭IndexHeader, 槽位Slot和消息的索引內容三部分構成

 

IndexHeader:索引文件頭信息40個字節的數據組成

 

復制代碼
 beginTimestamp    8位long類型,索引文件構建第一個索引的消息落在broker的時間

endTimestamp         8位long類型,索引文件構建最后一個索引消息落broker時間

beginPhyOffset         8位long類型,索引文件構建第一個索引的消息commitLog偏移量

endPhyOffset            8位long類型,索引文件構建最后一個索引消息commitLog偏移量

hashSlotCount    4位int類型,構建索引占用的槽位數(這個值貌似沒有具體作用)

indexCount                4位int類型,索引文件中構建的索引個數
復制代碼

 

槽位slot, 默認每個文件配置的slot個數為500萬個,每個slot是4位的int類型數據

計算消息的對應的slotPos=Math.abs(keyHash)%hashSlotNum

消息在IndexFile中的偏移量absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *HASH_SLOT_SIZE

Slot存儲的值為消息個數索引

 

消息的索引內容是20位定長內容的數據

       

復制代碼
 4位int值, 存儲的是key的hash值

         8位long值     存儲的是消息在commitlog的物理偏移量phyOffset

         4位int值        存儲了當前消息跟索引文件中第一個消息在broker落地的時間差

         4位int值        如果存在hash沖突,存儲的是上一個消息的索引地址
復制代碼

 

7. 索引服務IndexService線程

復制代碼
1.      索引配置:hashSlotNum哈希槽位個數、indexNum存儲索引的最大個數、storePath索引文件indexFile存儲的路徑

2.      Load broker啟動的時候加載本地IndexFile,

如果是異常啟動刪除之后storeCheckPoint文件,因為commitLog根據storeCheckPoint會重建之后的索引文件,

3.      Run方法,任務從阻塞隊列中獲取請求構建索引

4.      queryOffset 根據topic key 時間跨度來查詢消息

倒敘遍歷所有索引文件

每一個indexfile存儲了第一個消息和最后一個消息的存儲時間,根據傳入時間范圍來判斷索引是否落在此索引文件
復制代碼

 

8. 構建索引服務

復制代碼
分發消息索引服務將消息位置分發到ConsumeQueue中后,加入IndexService的LinkedBlockingQueue隊列中,IndexService通過任務向隊列中獲取請求來構建索引

剔除commitType或者rollbackType消息,因為這兩種消息都有對應的preparedType的消息

構建索引key(topic + "#" + key)

根據key的hashcode計算槽位,即跟槽位最大值取余數

計算槽位在indexfile的具體偏移量位置

根據槽位偏移量獲取存儲的上一個索引

計算消息跟文件頭存儲開始時間的時間差

根據消息頭記錄的存儲消息個數計算消息索引存儲的集體偏移量位置

寫入真正的索引,內容參考上面索引內容格式

將槽位中的更新為此消息索引

更新索引頭文件信息
復制代碼

 

 

 

9.  Broker與client(comsumer ,producer)之間的心跳

復制代碼
一:Broker接收client心跳ClientManageProcessor處理client的心跳請求

1.      構建ClientChannelInfo對象

1)  持有channel對象,表示與客戶端的連接通道

2)  ClientID表示客戶端

…..

2.      每次心跳會更新ClientChannelInfo的時間戳,來表示client還活着

3.      注冊或者更新consumer的訂閱關系(是以group為單位來組織的, group下可能有多個訂閱關系)

4.      注冊producer,其實就是發送producer的group(這個在事物消息中才有點作用)

二:ClientHouseKeepingService線程定時清除不活動的連接

1)  ProducerManager.scanNotActiveChannel    默認兩分鍾producer沒有發送心跳清除

2)  ConsumerManager.scanNotActiveChannel   默認兩份中Consumer沒有發送心跳清除
復制代碼

 

10. Broker與namesrv之間的心跳

復制代碼
1)  namesrv接收borker心跳DefaultRequestProcessor的REGISTER_BROKE事件處理,

(1)      注冊broker的topic信息

(2)      構建或者更新BrokerLiveInfo的時間戳

NamesrvController初始化時啟動線程定時調用RouteInfoManger的scanNotActiveBroker方法來定時不活動的broker(默認兩分鍾沒有向namesrv發送心跳更新時間戳的) 
復制代碼
 

 

 
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM