前言:
分布式消息系統作為實現分布式系統可擴展、可伸縮性的關鍵組件,需要具有高吞吐量、高可用等特點。而談到消息系統的設計,就回避不了兩個問題:
- 消息的順序問題
- 消息的重復問題
RocketMQ作為阿里開源的一款高性能、高吞吐量的消息中間件,它是怎樣來解決這兩個問題的?RocketMQ有哪些關鍵特性?其實現原理是怎樣的?
順序消息
消息有序指的是可以按照消息的發送順序來消費。例如:一筆訂單產生了 3 條消息,分別是訂單創建、訂單付款、訂單完成。消費時,要按照順序依次消費才有意義。與此同時多筆訂單之間又是可以並行消費的。 首先來看如下示例:
假如生產者產生了2條消息:M1、M2,要保證這兩條消息的順序,應該怎樣做?你腦中想到的可能是這樣:
你可能會采用這種方式保證消息順序
假定M1發送到S1,M2發送到S2,如果要保證M1先於M2被消費,那么需要M1到達消費端被消費后,通知S2,然后S2再將M2發送到消費端。
這個模型存在的問題是,如果M1和M2分別發送到兩台Server上,就不能保證M1先達到MQ集群,也不能保證M1被先消費。換個角度看,如果M2先於M1達到MQ集群,甚至M2被消費后,M1才達到消費端,這時消息也就亂序了,說明以上模型是不能保證消息的順序的。 如何才能在MQ集群保證消息的順序?一種簡單的方式就是將M1、M2發送到同一個Server上:
保證消息順序,你改進后的方法
這樣可以保證M1先於M2到達MQServer(生產者等待M1發送成功后再發送M2),根據先達到先被消費的原則,M1會先於M2被消費,這樣就保證了消息的順序。
這個模型也僅僅是理論上可以保證消息的順序,在實際場景中可能會遇到下面的問題:
網絡延遲問題
只要將消息從一台服務器發往另一台服務器,就會存在網絡延遲問題。如上圖所示,如果發送M1耗時大於發送M2的耗時,那么M2就仍將被先消費,仍然不能保證消息的順序。即使M1和M2同時到達消費端,由於不清楚消費端1和消費端2的負載情況,仍然有可能出現M2先於M1被消費的情況。
那如何解決這個問題?將M1和M2發往同一個消費者,且發送M1后,需要消費端響應成功后才能發送M2。
聰明的你可能已經想到另外的問題:如果M1被發送到消費端后,消費端1沒有響應,那是繼續發送M2呢,還是重新發送M1?一般為了保證消息一定被消費,肯定會選擇重發M1到另外一個消費端2,就如下圖所示。
保證消息順序的正確姿勢
這樣的模型就嚴格保證消息的順序,細心的你仍然會發現問題,消費端1沒有響應Server時有兩種情況,一種是M1確實沒有到達(數據在網絡傳送中丟失),另外一種消費端已經消費M1且已經發送響應消息,只是MQ Server端沒有收到。如果是第二種情況,重發M1,就會造成M1被重復消費。也就引入了我們要說的第二個問題,消息重復問題,這個后文會詳細講解。
回過頭來看消息順序問題,嚴格的順序消息非常容易理解,也可以通過文中所描述的方式來簡單處理。總結起來,要實現嚴格的順序消息,簡單且可行的辦法就是:
保證生產者 - MQServer - 消費者是一對一對一的關系
這樣的設計雖然簡單易行,但也會存在一些很嚴重的問題,比如:
- 並行度就會成為消息系統的瓶頸(吞吐量不夠)
- 更多的異常處理,比如:只要消費端出現問題,就會導致整個處理流程阻塞,我們不得不花費更多的精力來解決阻塞的問題。
但我們的最終目標是要集群的高容錯性和高吞吐量。這似乎是一對不可調和的矛盾,那么阿里是如何解決的?
世界上解決一個計算機問題最簡單的方法:“恰好”不需要解決它!——沈詢
有些問題,看起來很重要,但實際上我們可以通過合理的設計或者將問題分解來規避。如果硬要把時間花在解決問題本身,實際上不僅效率低下,而且也是一種浪費。從這個角度來看消息的順序問題,我們可以得出兩個結論:
- 不關注亂序的應用實際大量存在
- 隊列無序並不意味着消息無序
所以從業務層面來保證消息的順序而不僅僅是依賴於消息系統,是不是我們應該尋求的一種更合理的方式?
最后我們從源碼角度分析RocketMQ怎么實現發送順序消息的。
RocketMQ通過輪詢所有隊列的方式來確定消息被發送到哪一個隊列(負載均衡策略)。比如下面的示例中,訂單號相同的消息會被先后發送到同一個隊列中:
在獲取到路由信息以后,會根據MessageQueueSelector實現的算法來選擇一個隊列,同一個OrderId獲取到的肯定是同一個隊列。
消息重復
上面在解決消息順序問題時,引入了一個新的問題,就是消息重復。那么RocketMQ是怎樣解決消息重復的問題呢?還是“恰好”不解決。
造 成消息重復的根本原因是:網絡不可達。只要通過網絡交換數據,就無法避免這個問題。所以解決這個問題的辦法就是繞過這個問題。那么問題就變成了:如果消費端收到兩條一樣的消息,應該怎樣處理?
- 消費端處理消息的業務邏輯保持冪等性
- 保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時出現
第1條很好理解,只要保持冪等性,不管來多少條重復消息,最后處理的結果都一樣。第2條原理就是利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那么就不再處理這條消息。
第1條解決方案,很明顯應該在消費端實現,不屬於消息系統要實現的功能。第2條可以消息系統實現,也可以業務端實現。正常情況下出現重復消息的概率其實很小,如果由消息系統來實現的話,肯定會對消息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理消息重復的問題,這也是RocketMQ不解決消息重復的問題的原因。
RocketMQ 不保證消息不重復,如果你的業務需要保證嚴格的不重復消息,需要你自己在業務端去重。
事務消息
RocketMQ除了支持普通消息,順序消息,另外還支持事務消息。首先討論一下什么是事務消息以及支持事務消息的必要性。我們以一個轉帳的場景為例來說明這個問題:Bob向Smith轉賬100塊。
在單機環境下,執行事務的情況,大概是下面這個樣子:
單機環境下轉賬事務示意圖
當用戶增長到一定程度,Bob和Smith的賬戶及余額信息已經不在同一台服務器上了,那么上面的流程就變成了這樣:
集群環境下轉賬事務示意圖
這時候你會發現,同樣是一個轉賬的業務,在集群環境下,耗時居然成倍的增長,這顯然是不能夠接受的。那如何來規避這個問題?
大事務 = 小事務 + 異步
將大事務拆分成多個小事務異步執行。這樣基本上能夠將跨機事務的執行效率優化到與單機一致。轉賬的事務就可以分解成如下兩個小事務:
小事務+異步消息
圖中執行本地事務(Bob賬戶扣款)和發送異步消息應該保證同時成功或者同時失敗,也就是扣款成功了,發送消息一定要成功,如果扣款失敗了,就不能再發送消息。那問題是:我們是先扣款還是先發送消息呢?
首先看下先發送消息的情況,大致的示意圖如下:
事務消息:先發送消息
存在的問題是:如果消息發送成功,但是扣款失敗,消費端就會消費此消息,進而向Smith賬戶加錢。
先發消息不行,那就先扣款吧,大致的示意圖如下:
事務消息-先扣款
存在的問題跟上面類似:如果扣款成功,發送消息失敗,就會出現Bob扣錢了,但是Smith賬戶未加錢。
可能大家會有很多的方法來解決這個問題,比如:直接將發消息放到Bob扣款的事務中去,如果發送失敗,拋出異常,事務回滾。這樣的處理方式也符合“恰好”不需要解決的原則。
這里需要說明一下: 如果使用Spring來管理事物的話,大可以將發送消息的邏輯放到本地事物中去,發送消息失敗拋出異常,Spring捕捉到異常后就會回滾此事物,以此來保證本地事物與發送消息的原子性。
RocketMQ支持事務消息,下面來看看RocketMQ是怎樣來實現的。
RocketMQ實現發送事務消息
RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問消息,並修改消息的狀態。
細心的你可能又發現問題了,如果確認消息發送失敗了怎么辦?RocketMQ會定期掃描消息集群中的事物消息,如果發現了Prepared消息,它會向消息發送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
那我們來看下RocketMQ源碼,是如何處理事務消息的。客戶端發送事務消息的部分(完整代碼請查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):
接着查看sendMessageInTransaction方法的源碼,總共分為3個階段:發送Prepared消息、執行本地事務、發送確認消息。
endTransaction方法會將請求發往broker(mq server)去更新事務消息的最終狀態:
- 根據sendResult找到Prepared消息 ,sendResult包含事務消息的ID
- 根據localTransaction更新消息的最終狀態
如果endTransaction方法執行失敗,數據沒有發送到broker,導致事務消息的 狀態更新失敗,broker會有回查線程定時(默認1分鍾)掃描每個存儲事務狀態的表格文件,如果是已經提交或者回滾的消息直接跳過,如果是prepared狀態則會向Producer發起CheckTransaction請求,Producer會調用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調請求,而checkTransactionState會調用我們的事務設置的決斷方法來決定是回滾事務還是繼續執行,最后調用endTransactionOneway讓broker來更新消息的最終狀態。
再回到轉賬的例子,如果Bob的賬戶的余額已經減少,且消息已經發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題,解決超時問題的思路就是一直重試,直到消費端消費消息成功,整個過程中有可能會出現消息重復的問題,按照前面的思路解決即可。
消費事務消息
這樣基本上可以解決消費端超時問題,但是如果消費失敗怎么辦?阿里提供給我們的解決方法是:人工解決。大家可以考慮一下,按照事務的流程,因為某種原因Smith加款失敗,那么需要回滾整個流程。如果消息系統要實現這個回滾流程的話,系統復雜度將大大提升,且很容易出現Bug,估計出現Bug的概率會比消費失敗的概率大很多。這也是RocketMQ目前暫時沒有解決這個問題的原因,在設計實現消息系統時,我們需要衡量是否值得花這么大的代價來解決這樣一個出現概率非常小的問題,這也是大家在解決疑難問題時需要多多思考的地方。
Producer如何發送消息
Producer輪詢某topic下的所有隊列的方式來實現發送方的負載均衡,如下圖所示:
producer發送消息負載均衡
首先分析一下RocketMQ的客戶端發送消息的源碼:
在整個應用生命周期內,生產者需要調用一次start方法來初始化,初始化主要完成的任務有:
- 如果沒有指定namesrv地址,將會自動尋址
- 啟動定時任務:更新namesrv地址、從namsrv更新topic路由信息、清理已經掛掉的broker、向所有broker發送心跳…
- 啟動負載均衡的服務
初始化完成后,開始發送消息,發送消息的主要代碼如下:
代碼中需要關注的兩個方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面說過在producer初始化時,會啟動定時任務獲取路由信息並更新到本地緩存,所以tryToFindTopicPublishInfo會首先從緩存中獲取topic路由信息,如果沒有獲取到,則會自己去namesrv獲取路由信息。selectOneMessageQueue方法通過輪詢的方式,返回一個隊列,以達到負載均衡的目的。
如果Producer發送消息失敗,會自動重試,重試的策略:
- 重試次數 < retryTimesWhenSendFailed(可配置)
- 總的耗時(包含重試n次的耗時) < sendMsgTimeout(發送消息時傳入的參數)
- 同時滿足上面兩個條件后,Producer會選擇另外一個隊列發送消息
轉載地址:https://blog.csdn.net/wy0123/article/details/79496732