消息中間件核心實體(0)
最近兩周在做的一個新項目,一個主從復制的組件,這兩天剛跑通測試。
從之前討論的架構來說,消息中間件也是有主從復制這個模塊的,像Rocket就支持主從模式。
在做這個項目之前已經寫過兩個版本的主從復制模塊,基本思路是:
- Slave主動和Master建立鏈接
- Slave從Master不斷Pull數據
- 並ack進度給Master
- Master根據Slave的進度來支持異步復制、半同步復制的語義
- Slave上有replay線程根據復制數據恢復上層狀態
也可以采用Master向Slave push數據的方式(如果自己做主從復制,一定要去了解MySQL的主從復制實現)。
已經做了兩個版本的主從復制了,為什么最近又會起新項目去做這個事情呢?因為我們意識到主從復制其實是一個相對獨立的模塊,和上層的消息業務並不相關。比如DB、或者持久化KV存儲去做高可用方案的話可能都會涉及到主從復制這樣一個模塊。
所以我們想說能不能把主從模塊從消息中間件中剝離出來,寫成一個相對獨立的模塊。從確定這么做到完成第一個可以run的版本,花了兩周時間,其中有8、9天在進行設計和領域建模(核心實體的定義),編碼也就4、5天的樣子。這也是這個版本和之前版本最大的區別,我們花了大量的時間去抽象實體,最后在編碼上反而會簡單很多;而之的版本抽象層次太低,有太多過程化的編碼,雖然能run也沒什么問題,但總是不夠“優雅”。
說了這么多其實是想說,定義好實體基本上可以說完成項目編碼的百分之三四十了。好的實體定義(領域模型)會讓之后系統的實現變得簡單。
廢話說了這么多,接着談一談消息中間件中一些重要的實體和組件。
消息
Message
消息實體是消息中間件中最重要的對象了,關乎到用戶能寫入什么、消費什么,關乎到索引結構的設計。
一條消息最基礎的屬性有:
- topic
- content\body
topic表示這條消息屬於哪個主題,這樣最終Consumer可以通過訂閱這個topic來消費這條消息。content或body,消息內容或消息體(RocketMQ是body,我們習慣叫content),它是一個byte數組。因為作為消息中間件我們只會去存儲數據,數據的編解碼是有用戶自己決定的。
出去最最基礎的這兩個屬性,在使用消息中間件時往往會有過濾的需求。比如可能交易業務會將所有的訂單發送到一個topic,這時下游的業務方需要關注自己的業務,可能一些是需要處理虛擬商品的業務,一些是需要處理特普通商品的,這樣就需要每個業務方能過濾出自己需要的消息。
所以Message往往會有一個tag屬性:
- topic
- tag
- content\body
用於做消息過濾。tag屬性是一個String類型的,每條消息可以有一個tag,我們稱為打標,Consumer在訂閱消息的時候可以指定自己需要的一批tag。
RocketMQ中(開源版本)也這樣去實現了,但是它將消息所有的屬性放入到一個Map中:
- properties:Map<String,String>
(不知道RocketMQ有沒有支持多tag的版本,我們遇到過希望消息有多個tag的情況。這也是個挺正常的需求,比如可以從不同維護划分消息,比如支付類型+商品類型等,過濾的時候是一個and的邏輯,這個可以作為一個功能提升的考慮)
RocketMQ的Message properties中還有key、delaylevel、waitstore等屬性,分別用於查詢消息、設置延遲投遞、是否等待刷盤等。
以上是暴露給用戶的Message對象的基礎屬性,也決定了用戶能執行的操作無非是配置上面一些內容。
MessageExt
Message是基礎的消息,對於系統內部發送和消費時這些屬性是不夠的,所以內部回去拓展一個MessageExt,包含額外的一些屬性:
- id:消息的ID,可以考慮能否用一個long類型來做成全局唯一的,這樣可以基於它做冪等之類的操作
- queueId:目標的隊列ID(這條消息最終落到哪個分區中——分區即隊列,每個分區都是一個先進先出的隊列)
- bornTime:消息的產生時間
- bornAddress:消息的產生地址
- storeTime:消息的存儲時間
- crc:crc校驗
- ...
主題
Topic
主題相關的,最基礎的實體是Topic,它描述了主題最基礎的屬性,比如名稱、負責人等。
- name
- owner
owner信息可以是topic所屬的團隊或責任人等,主要用戶在發生異常或其他需要人工反饋的場景下,能找到對應的人或者發送告警。
RocketMQ中對應的是TopicConfig實體,描述了主題最基礎的屬性:
- topicName
- readQueueNums
- writeQueueNums
- perm:讀寫模式
- topicFilterType:過濾類型(只支持單tag,尚不支持多tag)
- topicSysFlag:系統屬性
- order:是否順序?
其中一些屬性並沒有很理解,比如readQueueNums和writeQueueNums,一個topic不應該有多少讀的隊列就有多少寫的隊列嗎?(沒有實踐中使用RocketMQ的經驗,還望了解的同學指教)
Topic元數據
和主題相關的最重要的實體應該是隊列的分布情況,即一個Topic包含了哪些隊列,把這個元數據暫且成為TopicMeta。
一個TopicMeta對象需要有隊列的部分情況,這樣,
- 在發送時,根據消息的topic屬性,獲取到TopicMeta再從中獲取隊列信息,然后寫入到特定的隊列中
- 在消費時,獲取隊列信息,然后從每個隊列中獲取數據
在第一次考慮這個實體的時候,它大概是這個樣子的:
- Topic topic:包含一個Topic實體,表明基礎信息
- int queueNums:包含的隊列數
- List broker:分布的Broker節點
這個結構能滿足需求。
Producer拿到TopicMeta后,根據brokers.size * queueNums得到總分區數,每次發送消息時根據一定的路由策略選擇一個分區(隊列)作為目標分區進行寫入。
Consumer拿到TopicMeta后,知道所有的broker,知道分區數,這樣就知道了所有的分區情況(每個Broker上同一個Topic擁有相同數據的分區,編號為[0, queueNums-1]),能建立所有分區的消費關系。
但是在不斷的實踐中,發現這種模式並不是一種很好的抽象:
- 在對Topic進行擴容和縮容的時候,只能以Broker為單位,即每次擴容或縮容的分區數都是queueNums的倍數
- 隱含了一層關系,即客戶端知道總分區數的計算規則和分區的分布規則
對TopicMeta的抽象應該是真實的描述Topic的隊列的分布情況,所以TopicMeta應該包含所有的隊列的分布情況,應該包含一個Set或List集合,里面包含了所有的隊列。
TopicMeta
- Topic topic
- Set/List queues:隊列信息(Queue描述了自身的信息)
Kafka的實現中是Topic信息包含了所有隊列的信息,使用了一個Map去存儲,Key是一個Integer,應該是Partition的ID。
隊列
Queue是消息聚合的最小單位,一個Queue應該反映出自身所處的物理地址,這樣可以進行寫入和消費,另外應該包含一些狀態來描述是否可讀可寫。另外應該有它的備份信息(高可用是每個部分都應該考慮的),即這個隊列的備份隊列分布等。
Kafka中這個對象叫TopicPartitionInfo,包含屬性如下:
- int partition
- Node leader
- List<Node> replicas
- List<Node> isr
這個實體定義相對來說是比較好的,描述了這個隊列當前的Leader,它的備份,也就是每個隊列都是可以進行主備切換的(回想一下,Kafka中每個Broker相互備份Partition的,而不是Broker之間的主從備份)。在客戶端也不會隱含什么規則,而是直接根據路由策略來使用分區(隊列)。
小結
消息中間件模型中遠遠不止上面這一些實體,但是不希望篇幅太長(看起來太累),所以打算拆開成幾篇。
這篇主要是基礎的實體,下一篇會寫和核心流程相關的一些實體,主要會是路由、數據讀取等。