要持久化的關鍵數據有三種
- 消息;
- 隊列,隊列中存放的是消息索引信息,即消息在文件中的物理位置(messageOffset)和在隊列中的邏輯位置(queueOffset)的映射信息;
- 隊列消費進度,表示當前隊列中的消息消費到第幾個了;
發送消息的設計
- producer將消息的二進制數據發送到broker;
- broker做的事情:
- 單線程持久化消息到內存映射文件;
- 將當前消息的索引信息放入緩沖區,可以使用disruptor的ringbuffer實現,單線程寫,無鎖。
- 單線程從緩沖區讀取消息索引信息,並將索引信息寫入內存映射文件;
- 消息的內存映射文件、消息索引的內存映射文件都定時刷新到磁盤,比如每隔1s刷新一次,可配置;
- broker將當前消息的索引信息放入緩沖區后,就立即返回了,然后producer就收到了消息發送的結果;
其他說明:
- 因為不可能用一個文件來保存所有的消息,所以肯定是用多個文件的方式。也就是說,無論是保存消息還是保存消息索引,都用多個文件。另外,由於隊列有多個,所以每個隊列都對應多個內存映射文件。隊列文件的目錄命名規則:rootPath / topic / queueId / queue mapped files
- broker在將消息的索引信息放入緩沖區時,要檢查緩沖區是否到達一定的水位,比如ringbuffer總大小100W個槽,假如水位是80%,那就是當現在ringbuffer中可用的槽不到20%時,應該要做流控,比如sleep 100s;理論上應該不會到達水位,因為寫消息索引肯定比寫消息本身要快;
消費消息的設計
- consumer告訴broker當前需要拉取哪個topic下的哪個隊列里的第幾個位置(queueOffset)開始的消息,並告訴要最多拉取多少個消息;
- broker根據topic和queueId找到對應的隊列;
- 根據queueOffset從隊列拿到消息在文件中的物理位置,即messageOffset;
- 根據messageOffset從消息的內存映射文件獲取消息二進制數據;
- 將消息二進制數據寫入臨時的內存流里,該內存流里包含了所有要返回的消息;
- 消息拉取數量達到要求或沒有新的消息可以拉取后,將內存流對應的二進制數據返回給consumer;
- consumer解析二進制數據,得到所有的消息對象;
broker定時清理過期的消息和消息索引
- 每隔10s掃描是否有過期的消息文件,過期時間可配置,比如三天;掃描時,發現文件的最后修改時間是3天前,則刪除;
- 每隔10s掃描是否有過期的消息索引文件,判斷是否過期的依據是掃描每個消息索引文件,判斷該文件中的最后一個消息索引的messageOffset是否比最小的messageOffset還要小;如果小,就說明這個消息索引文件已經無意義了,可以刪除;
broker啟動時的邏輯
- 掃描磁盤上所有的消息的存儲文件,為每個文件建立內存映射;
- 掃描磁盤上所有的隊列(消息索引)的存儲文件,為每個文件建立內存映射;
- 對每個隊列,預恢復幾個文件(比如最后的3個文件)的數據到內存,剩余的用到時再恢復;
- 同理,對於存儲消息的文件,也預恢復幾個(比如最后的3個文件)到內存;一般大部分消息者只要消費進度不是太慢,總是應該已經趕上了最后那三個文件了;
- 關於異常關閉broker時的邏輯,暫時還沒想清楚,還需要再細思;
