broker 1. broker的啟動
brker的啟動
Broker向namesrv注冊
1. 獲取namesrv的地址列表(是亂序的)
2. 遍歷向每個namesrv注冊topic的配置信息topicconfig
Topic在broker文件上的存儲json格式
"TopicTest":{ "perm":6, "readQueueNums":8, "topicFilterType":"SINGLE_TAG", "topicName":"TopicTest", "writeQueueNums":8 }
Namesrv接收Broker注冊的topic信息, namesrv只存內存,但是broker有任務定時推送
1. 接收數據向RouteInfoManager注冊。
Broker初始化加載本地配置,配置信息是以json格式存儲在本地, rocketmq強依賴fastjson作轉換, RocketMq通過ConfigMananger來管理配置加載以及持久化

1. 加載topic配置${user.home}/store/config/topics.json { "dataVersion":{ "counter":2, "timestatmp":1393729865073 }, "topicConfigTable":{ //根據consumer的group生成的重試topic "%RETRY% group_name":{ "perm":6, "readQueueNums":1, "topicFilterType":"SINGLE_TAG", "topicName":"%RETRY%group_name", "writeQueueNums":1 }, "TopicTest":{ "perm":6, // 100讀權限 , 10寫權限 6是110讀寫權限 "readQueueNums":8, "topicFilterType":"SINGLE_TAG", "topicName":"TopicTest", "writeQueueNums":8 } } } 2. 加載消費進度偏移量 ${user.home}/store/config/consumerOffset.json { "offsetTable":{ "%RETRY% group_name@group_name":{ 0:0 //重試隊列消費進度為零 }, "TopicTest@ group_name":{ 0:23,1:23,2:22,3:22,4:21,5:18,6:18,7:18 //分組名group_name消費topic為TopicTest的進度為: // 隊列queue=0 消費進度23 // 隊列 queue=2 消費進度為22 等等… } } } 3. 加載消費者訂閱關系 ${user.home}/store/config/subscriptionGroup.json { "dataVersion":{ "counter":1, "timestatmp":1393641744664 }, "group_name":{ "brokerId":0, //0代表這台broker機器為master,若要設為slave值大於0 "consumeBroadcastEnable":true, "consumeEnable":true, "consumeFromMinEnable":true, "groupName":"group_name", "retryMaxTimes":5, "retryQueueNums":1, "whichBrokerWhenConsumeSlowly":1 } } }
broker 2. broker的消息存儲
Rocketmq的消息的存儲是由consume queue和 commitLog 配合完成的
1) consume queue 消息的邏輯隊列,相當於字典的目錄用來指定消息在消息的真正的物理文件commitLog上的位置, 每個topic下的每個queue都有一個對應的consumequeue文件。 文件地址:${user.home} \store\consumequeue\${topicName}\${queueId}\${fileName} consume queue中存儲單元是一個20字節定長的數據,是順序寫順序讀 (1) commitLogOffset是指這條消息在commitLog文件實際偏移量 (2) size就是指消息大小 (3) 消息tag的哈希值
ConsumeQueue文件組織:
(1) topic queueId來組織的,比如TopicA配了讀寫隊列0, 1,那么TopicA和Queue=0組成一個ConsumeQueue,TopicA和Queue=1組成一個另一個ConsumeQueue. (2) 按消費端group分組重試隊列,如果消費端消費失敗,發送到retry消費隊列中 (3) 按消費端group分組死信隊列,如果消費端重試超過指定次數,發送死信隊列 (4) 每個ConsumeQueue可以由多個文件組成無限隊列被MapedFileQueue對象管理
2) CommitLog消息存放物理文件,每台broker上的commitLog被本機器所有queue共享不做區分
文件地址:${user.home} \store\${commitlog}\${fileName} 一個消息存儲單元長度是不定的,順序寫但是隨機讀 消息存儲結構: = 4 //4個字節代表這個消息的大小 + 4 //四個字節的MAGICCODE = daa320a7 + 4 //消息體BODY CRC 當broker重啟recover時會校驗 + 4 //queueId 你懂得 + 4 //flag 這個標志值rocketmq不做處理,只存儲后透傳 + 8 //QUEUEOFFSET這個值是個自增值不是真正的consume queue的偏移量,可以代表這個隊列中消息的個數,要通過這個值查找到consume queue中數據,QUEUEOFFSET * 20才是偏移地址 + 8 //PHYSICALOFFSET 代表消息在commitLog中的物理起始地址偏移量 + 4 //SYSFLAG消息標志,指明消息是事物事物狀態等等消息特征 + 8 //BORNTIMESTAMP 消息產生端(producer)的時間戳 + 8 //BORNHOST 消息產生端(producer)地址(address:port) + 8 //STORETIMESTAMP 消息在broker存儲時間 + 8 //STOREHOSTADDRESS 消息存儲到broker的地址(address:port) + 8 //RECONSUMETIMES消息被某個訂閱組重新消費了幾次(訂閱組之間獨立計數),因為重試消息發送到了topic名字為%retry%groupName的隊列queueId=0的隊列中去了 + 8 //Prepared Transaction Offset 表示是prepared狀態的事物消息 + 4 + bodyLength // 前4個字節存放消息體大小值, 后bodylength大小空間存儲了消息體內容 + 1 + topicLength //一個字節存放topic名稱能容大小, 后存放了topic的內容 + 2 + propertiesLength // 2個字節(short)存放屬性值大小, 后存放propertiesLength大小的屬性數據
3) MapedFile 是PageCache文件封裝,操作物理文件在內存中的映射以及將內存數據持久化到物理文件中,代碼中寫死了要求os系統的頁大小為4k, 消息刷盤根據參數(commitLog默認至少刷4頁, consumeQueue默認至少刷2頁)才刷
以下io對象構建了物理文件映射內存的對象 FileChannel fileChannel = new RandomAccessFile(file,“rw”).getChannel(); MappedByteBuffer mappedByteBuffer=fileChannel.map(READE_WRITE,0,fileSize); 構建mapedFile對象需要兩個參數 fileSize: 映射的物理文件的大小 commitLog每個文件的大小默認1G =1024*1024*1024 ConsumeQueue每個文件默認存30W條 = 300000 *CQStoreUnitSize(每條大小) filename: filename文件名稱但不僅僅是名稱還表示文件記錄的初始偏移量, 文件名其實是個long類型的值
4) MapedFileQueue 存儲隊列,數據定時刪除,無限增長。
隊列有多個文件(MapedFile)組成,由集合對象List表示升序排列,前面講到文件名即是消息在此文件的中初始偏移量,排好序后組成了一個連續的消息隊
當消息到達broker時,需要獲取最新的MapedFile寫入數據,調用MapedFileQueue的getLastMapedFile獲取,此函數如果集合中一個也沒有創建一個,如果最后一個寫滿了也創建一個新的。
MapedFileQueue在獲取getLastMapedFile時,如果需要創建新的MapedFile會計算出下一個MapedFile文件地址,通過預分配服務AllocateMapedFileService異步預創建下一個MapedFile文件,這樣下次創建新文件請求就不要等待,因為創建文件特別是一個1G的文件還是有點耗時的,
getMinOffset獲取隊列消息最少偏移量,即第一個文件的文件起始偏移量
getMaxOffset獲取隊列目前寫到位置偏移量
getCommitWhere刷盤刷到哪里了
5) DefaultMessageStore 消息存儲層實現
(1)putMessage 添加消息委托給commitLog.putMessage(msg),主要流程: <1> 從mapedFileQueue獲取最新的映射文件 <2>向mapedFile中添加一條消息記錄 <3> 構建DispatchRequest對象,添加到分發索引服務DispatchMessageService線程中去 <4>喚醒異步刷盤線程 <5> 向發送方返回結果 (2)DispatchMessageService <1>分發消息位置到ConsumeQueue <2>分發到IndexService建立索引
broker 3. load&recover
Broker啟動的時候需要加載一系列的配置,啟動一系列的任務,主要分布在BrokerController 的initialize()和start()方法中
1. 加載topic配置 2. 加載消費進度consumer offset 3. 加載消費者訂閱關系consumer subscription 4. 加載本地消息messageStore.load() a) Load 定時進度 b) Load commit log commitLog其實調用存儲消費隊列mapedFileQueue.load()方法來加載的。 遍歷出${user.home} \store\${commitlog}目錄下所有commitLog文件,按文件名(文件名就是文件的初始偏移量)升序排一下, 每個文件構建一個MapedFile對象, 在MapedFileQueue中用集合list把這些MapedFile文件組成一個邏輯上連續的隊列 c) Load consume Queue 遍歷${user.home} \store\consumequeue下的所有文件夾(每個topic就是一個文件夾) 遍歷${user.home} \store\consumequeue\${topic}下的所有文件夾(每個queueId 就是一個文件夾) 遍歷${user.home} \store\consumequeue\${topic}\${queueId}下所有文件,根據topic, queueId, 文件來構建ConsueQueue對象 DefaultMessageStore中存儲結構Map<topic,Map<queueId, CosnueQueue>> 每個Consumequeue利用MapedFileQueue把mapedFile組成一個邏輯上連續的隊列 d) 加載事物模塊 e) 加載存儲檢查點 加載${user.home} \store\checkpoint 這個文件存儲了3個long類型的值來記錄存儲模型最終一致的時間點,這個3個long的值為 physicMsgTimestamp為commitLog最后刷盤的時間 logicMsgTimestamp為consumeQueue最終刷盤的時間 indexMsgTimestamp為索引最終刷盤時間 checkpoint作用是當異常恢復時需要根據checkpoint點來恢復消息 f) 加載索引服務indexService g) recover嘗試數據恢復 判斷是否是正常恢復,系統啟動的啟動存儲服務(DefaultMessageStore)的時候會創建一個臨時文件abort, 當系統正常關閉的時候會把這個文件刪掉,這個類似在Linux下打開vi編輯器生成那個臨時文件,所有當這個abort文件存在,系統認為是異常恢復
1) 先按照正常流程恢復ConsumeQueue 為什么說先正常恢復,那么異常恢復在哪呢?當broker是異常啟動時候,在異常恢復commitLog時會重新構建請到DispatchMessageService服務,來重新生成ConsumeQueue數據,
索引以及事物消息的redolog 什么是恢復ConsumeQueue, 前面不是有步驟load了ConsumeQueue嗎,為什么還要恢復? 前面load步驟創建了MapedFile對象建立了文件的內存映射,但是數據是否正確,現在文件寫到哪了(wrotePosition),
Flush到了什么位置(committedPosition)?恢復數據來幫我解決這些問題。 每個ConsumeQueue的mapedFiles集合中,從倒數第三個文件開始恢復(為什么只恢復倒數三個文件,也許只是個經驗值吧),
因為consumequeue的存儲單元是20字節的定長數據,所以是依次分別取了 Offset long類型存儲了commitLog的數據偏移量 Size int類型存儲了在commitLog上消息大小 tagcode tag的哈希值 目前rocketmq判斷存儲的consumequeue數據是否有效的方式為判斷offset>= 0 && size > 0 如果數據有效讀取下20個字節判斷是否有效 如果數據無效跳出循環,記錄此時有效數據的偏移量processOffset 如果讀到文件尾,讀取下一個文件 proccessOffset是有效數據的偏移量,獲取這個值的作用什么? (1) proccessOffset后面的數據屬於臟數據,后面的文件要刪除掉 (2) 設置proccessOffset所在文件MapedFile的wrotePosition和commitedPosition值,值為 proccessOffset%mapedFileSize 2) 正常恢復commitLog文件 步驟跟流程恢復Consume Queue 判斷消息有效, 根據消息的存儲格式讀取消息到DispatchRequest對象,獲取消息大小值msgSize 大於 0 正常數據 等於-1 文件讀取錯誤 恢復結束 等於0 讀到文件末尾 3) 異常數據恢復,OSCRASH或者JVM CRASH或者機器掉電 當${user.home}\store\abort文件存在,代表異常恢復 讀取${user.home} \store\checkpoint獲取最終一致的時間點 判斷最終一致的點所在的文件是哪個 從最新的mapedFile開始,獲取存儲的一條消息在broker的生成時間,大於checkpoint時間點的放棄找前一個文件,小於等於checkpoint時間點的說明checkpoint
在此mapedfile文件中 從checkpoint所在mapedFile開始恢復數據,它的整體過程跟正常恢復commitlog類似,最重要的區別在於 (1)讀取消息后派送到分發消息服務DispatchMessageService中,來重建ConsumeQueue以及索引 (2)根據恢復的物理offset,清除ConsumeQueue多余的數據 4) 恢復TopicQueueTable=Map<topic-queueid,offset> (1) 恢復寫入消息時,消費記錄隊列的offset (2) 恢復每個隊列的最小offset 5. 初始化通信層 6. 初始化線程池 7. 注冊broker端處理器用來接收client請求后選擇處理器處理 8. 啟動每天凌晨00:00:00統計消費量任務 9. 啟動定時刷消費進度任務 10. 啟動掃描數據被刪除了的topic,offset記錄也對應刪除任務 11. 如果namesrv地址不是指定的,而是從靜態服務器取的,啟動定時向靜態服務器獲取namesrv地址的任務 12. 如果broker是master,啟動任務打印slave落后master沒有同步的bytes 如果broker是slave,啟動任務定時到mastser同步配置信息
broker 4. HA&master slave
在broker啟動的時候BrokerController如果是slave,配置了master地址更新,沒有配置所有broker會想namesrv注冊,從namesrv獲取haServerAddr,然后更新到HAClient
當HAClient的MasterAddress不為空的時候(因為broker master和slave都構建了HAClient)會主動連接master獲取SocketChannel Master監聽Slave請求的端口,默認為服務端口+1
接收slave上傳的offset long類型 int pos = this.byteBufferRead.position() -(this.byteBufferRead.position() % 8)
//沒有理解意圖
long readOffset =this.byteBufferRead.getLong(pos - 8); this.processPostion = pos;
主從復制從哪里開始復制:如果請求時0 ,從最后一個文件開始復制
Slave啟動的時候brokerController開啟定時任務定時拷貝master的配置信息
SlaveSynchronize類代表slave從master同步信息(非消息)
syncTopicConfig 同步topic的配置信息
syncConsumerOffset 同步消費進度
syncDelayOffset 同步定時進度
syncSubcriptionGroupConfig 同步訂閱組配7F6E
HaService類實現了HA服務,負責同步雙寫,異步復制功能, 這個類master和slave的broker都會實例化,
Master通過AcceptSocketService監聽slave的連接,每個masterslave連接都會構建一個HAConnection對象搭建他們之間的橋梁,對於一個master多slave部署結構的會有多個HAConnection實例,
Master構建HAConnection時會構建向slave寫入數據服務線程對象WriteSocketService對象和讀取Slave反饋服務線程對象ReadSocketService
WriteSocketService
向slave同步commitLog數據線程,
slaveRequestOffset是每次slave同步完數據都會向master發送一個ack表示下次同步的數據的offset。
如果slave是第一次啟動的話slaveRequestOffset=0, master會從最近那個commitLog文件開始同步。(如果要把master上的所有commitLog文件同步到slave的話, 把masterOffset值賦為minOffset)
向socket寫入同步數據: 傳輸數據協議<Phy Offset> <Body Size> <Body Data>
ReadSocketService:
4.2 ReadSocketService
讀取slave通過HAClient向master返回同步commitLog的物理偏移量phyOffset值
通知前端線程,如果是同步復制的話通知是否復制成功
Slave 通過HAClient建立與master的連接,
來定時匯報slave最大物理offset,默認5秒匯報一次也代表了跟master之間的心跳檢測
讀取master向slave寫入commitlog的數據, master向slave寫入數據的格式是
Slave初始化DefaultMessageStore時候會構建ReputMessageService服務線程並在啟動存儲服務的start方法中被啟動
ReputMessageService的作用是slave從物理隊列(由commitlog文件構成的MapedFileQueue)加載數據,並分發到各個邏輯隊列
HA同步復制, 當msg寫入master的commitlog文件后,判斷maser的角色如果是同步雙寫SYNC_MASTER, 等待master同步到slave在返回結果
3 HA異步復制

broker 6.索引服務
1索引結構
IndexFile 存儲具體消息索引的文件,文件的內容結構如圖:
索引文件由索引文件頭IndexHeader, 槽位Slot和消息的索引內容三部分構成
IndexHeader:索引文件頭信息40個字節的數據組成
beginTimestamp 8位long類型,索引文件構建第一個索引的消息落在broker的時間
endTimestamp 8位long類型,索引文件構建最后一個索引消息落broker時間
beginPhyOffset 8位long類型,索引文件構建第一個索引的消息commitLog偏移量
endPhyOffset 8位long類型,索引文件構建最后一個索引消息commitLog偏移量
hashSlotCount 4位int類型,構建索引占用的槽位數(這個值貌似沒有具體作用)
indexCount 4位int類型,索引文件中構建的索引個數
槽位slot, 默認每個文件配置的slot個數為500萬個,每個slot是4位的int類型數據
計算消息的對應的slotPos=Math.abs(keyHash)%hashSlotNum
消息在IndexFile中的偏移量absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *HASH_SLOT_SIZE
Slot存儲的值為消息個數索引
消息的索引內容是20位定長內容的數據
4位int值, 存儲的是key的hash值
8位long值 存儲的是消息在commitlog的物理偏移量phyOffset
4位int值 存儲了當前消息跟索引文件中第一個消息在broker落地的時間差
4位int值 如果存在hash沖突,存儲的是上一個消息的索引地址
2. 索引服務IndexService線程
1. 索引配置:hashSlotNum哈希槽位個數、indexNum存儲索引的最大個數、storePath索引文件indexFile存儲的路徑 2. Load broker啟動的時候加載本地IndexFile, 如果是異常啟動刪除之后storeCheckPoint文件,因為commitLog根據storeCheckPoint會重建之后的索引文件, 3. Run方法,任務從阻塞隊列中獲取請求構建索引 4. queryOffset 根據topic key 時間跨度來查詢消息 倒敘遍歷所有索引文件 每一個indexfile存儲了第一個消息和最后一個消息的存儲時間,根據傳入時間范圍來判斷索引是否落在此索引文件
3. 構建索引服務
分發消息索引服務將消息位置分發到ConsumeQueue中后,加入IndexService的LinkedBlockingQueue隊列中,IndexService通過任務向隊列中獲取請求來構建索引 剔除commitType或者rollbackType消息,因為這兩種消息都有對應的preparedType的消息 構建索引key(topic + "#" + key) 根據key的hashcode計算槽位,即跟槽位最大值取余數 計算槽位在indexfile的具體偏移量位置 根據槽位偏移量獲取存儲的上一個索引 計算消息跟文件頭存儲開始時間的時間差 根據消息頭記錄的存儲消息個數計算消息索引存儲的集體偏移量位置 寫入真正的索引,內容參考上面索引內容格式 將槽位中的更新為此消息索引 更新索引頭文件信息
4. Broker與client(comsumer ,producer)之間的心跳,
一:Broker接收client心跳ClientManageProcessor處理client的心跳請求 1. 構建ClientChannelInfo對象 1) 持有channel對象,表示與客戶端的連接通道 2) ClientID表示客戶端 ….. 2. 每次心跳會更新ClientChannelInfo的時間戳,來表示client還活着 3. 注冊或者更新consumer的訂閱關系(是以group為單位來組織的, group下可能有多個訂閱關系) 4. 注冊producer,其實就是發送producer的group(這個在事物消息中才有點作用) 二:ClientHouseKeepingService線程定時清除不活動的連接 1) ProducerManager.scanNotActiveChannel 默認兩分鍾producer沒有發送心跳清除 2) ConsumerManager.scanNotActiveChannel 默認兩份中Consumer沒有發送心跳清除
5. Broker與namesrv之間的心跳
1) namesrv接收borker心跳DefaultRequestProcessor的REGISTER_BROKE事件處理, (1) 注冊broker的topic信息 (2) 構建或者更新BrokerLiveInfo的時間戳 NamesrvController初始化時啟動線程定時調用RouteInfoManger的scanNotActiveBroker方法來定時不活動的broker(默認兩分鍾沒有向namesrv發送心跳更新時間戳的)