消息隊列常見問題及解決方案 參考鏈接
前一章節討論了MQ的優缺點,本章主要針對缺點進行分析解決。
一、如何保證MQ的高可用性 —— (消息丟失)
RabbitMQ鏡像模式
鏡像集群模式是所謂的RabbitMQ的高可用模式,跟普通集群模式不一樣的是,你創建的queue,無論元數據還是queue里的消息都會存在於多個實例上,然后每次你寫消息到queue的時候,都會自動把消息到多個實例的queue里進行消息同步。
優點在於你任何一個實例宕機了,沒事兒,別的實例都可以用。缺點在於性能開銷太大和擴展性很低,同步所有實例,這會導致網絡帶寬和壓力很重,而且擴展性很低,每增加一個實例都會去包含已有的queue的所有數據,並沒有辦法線性擴展queue。
開啟鏡像集群模式可以去RabbitMQ的管理控制台去增加一個策略,指定要求數據同步到所有節點的,也可以要求就同步到指定數量的節點,然后你再次創建queue的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。
Kafka高可用
Kafka 一個最基本的架構認識:由多個 broker 組成,每個 broker 是一個節點;你創建一個 topic,這個 topic 可以划分為多個 partition,每個 partition 可以存在於不同的 broker 上,每個 partition 就放一部分數據。
這就是天然的分布式消息隊列,就是說一個 topic 的數據,是分散放在多個機器上的,每個機器就放一部分數據。
實際上 RabbmitMQ 之類的,並不是分布式消息隊列,它就是傳統的消息隊列,只不過提供了一些集群、HA(High Availability, 高可用性) 的機制而已,因為無論怎么玩兒,RabbitMQ 一個 queue 的數據都是放在一個節點里的,鏡像集群下,也是每個節點都放這個 queue 的完整數據。
Kafka 0.8 以前,是沒有 HA 機制的,就是任何一個 broker 宕機了,那個 broker 上的 partition 就廢了,沒法寫也沒法讀,沒有什么高可用性可言。
比如說,我們假設創建了一個 topic,指定其 partition 數量是 3 個,分別在三台機器上。但是,如果第二台機器宕機了,會導致這個 topic 的 1/3 的數據就丟了,因此這個是做不到高可用的。
Kafka 0.8 以后,提供了 HA 機制,就是 replica(復制品) 副本機制。每個 partition 的數據都會同步到其它機器上,形成自己的多個 replica 副本。所有 replica 會選舉一個 leader 出來,那么生產和消費都跟這個 leader 打交道,然后其他 replica 就是 follower。寫的時候,leader 會負責把數據同步到所有 follower 上去,讀的時候就直接讀 leader 上的數據即可。只能讀寫 leader?很簡單,要是你可以隨意讀寫每個 follower,那么就要 care 數據一致性的問題,系統復雜度太高,很容易出問題。Kafka 會均勻地將一個 partition 的所有 replica 分布在不同的機器上,這樣才可以提高容錯性。(上述就是說每個被分割的數據partition都會有副本,每個副本鏈都會有一個Leader進行讀寫,其他flower只負責副本存儲)
這么搞,就有所謂的高可用性了,因為如果某個 broker 宕機了,沒事兒,那個 broker上面的 partition 在其他機器上都有副本的,如果這上面有某個 partition 的 leader(即Leader宕機了),那么此時會從 follower 中重新選舉一個新的 leader 出來,大家繼續讀寫那個新的 leader 即可。這就有所謂的高可用性了。
寫數據的時候,生產者就寫 leader,然后 leader 將數據落地寫本地磁盤,接着其他 follower 自己主動從 leader 來 pull 數據。一旦所有 follower 同步好數據了,就會發送 ack 給 leader,leader 收到所有 follower 的 ack 之后,就會返回寫成功的消息給生產者。(當然,這只是其中一種模式,還可以適當調整這個行為)
消費的時候,只會從 leader 去讀,但是只有當一個消息已經被所有 follower 都同步成功返回 ack 的時候,這個消息才會被消費者讀到。
二、如何保證消息消費時的冪等性(如何保證消息不被重復消費)
1. 什么是重復消費?
消息隊列中存儲多個消息,消費者消費完了之后一般都會做出響應,例如Kafka有一個offset概念,每次寫入消息便會分配一個offset,代表消息的序號。當消費者消費完之后便會把自己消費過的消息提交一下,表示已經被消費。如果消費者已經消費但是返回offset過程中宕機了,那么下次重啟之后,會再次消費已經被消費的消息。

2. 重復消費帶來的問題?
例如已經插入了一條數據,再次消費會再次插入。我們需要保證一個數據重復多次時對應的數據不會被改變,不能出錯。
3. 如何解決重復消費?
① 如果寫入數據庫就先根據主鍵查一下,如果已存在就不插入,update即可。
② 如果是寫入Redis,那就沒問題,反正每次都是set,天然冪等性。
③ 生產者每次寫入消息可以加入一個全局唯一ID,類似訂單ID,當消費時,現根據ID去Redis查一下之前是否被消費過,如果被消費過就不處理。
④ 基於數據庫的唯一鍵來保證重復數據不會重復插入多條。因為有唯一鍵約束,再次插入重復數據只會報錯。
三、如何保證消息可靠傳輸
RobbitMQ
1. 消息丟失的三種情況——生產者、消息隊列、消費者

2. 分別討論解決方案
【1】生產者傳輸過程中丟失數據——Confirm機制
使用RabbitMQ的事務功能,此時可以選擇用 RabbitMQ 提供的事務功能,就是生產者發送數據之前開啟 RabbitMQ 事務channel.txSelect,然后發送消息,如果消息沒有成功被 RabbitMQ 接收到,那么生產者會收到異常報錯,此時就可以回滾事務channel.txRollback,然后重試發送消息;如果收到了消息,那么可以提交事務channel.txCommit。但是問題是,RabbitMQ 事務機制(同步)一搞,基本上吞吐量會下來,因為太耗性能。
// 開啟事務 channel.txSelect try { // 這里發送消息 } catch (Exception e) { channel.txRollback // 這里再次重發這條消息 } // 提交事務 channel.txCommit
所以一般來說,如果你要確保說寫 RabbitMQ 的消息別丟,可以開啟confirm模式,在生產者那里設置開啟confirm模式之后,你每次寫的消息都會分配一個唯一的 id,然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個ack消息,告訴你說這個消息 ok 了。如果 RabbitMQ 沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在內存里維護每個消息 id 的狀態,如果超過一定時間還沒接收到這個消息的回調,那么你可以重發。
事務機制和cnofirm機制最大的不同在於,事務機制是同步的,你提交一個事務之后會阻塞在那兒,但是confirm機制是異步的,你發送個消息之后就可以發送下一個消息,然后那個消息RabbitMQ 接收了之后會異步回調你一個接口通知你這個消息接收到了。
所以一般在生產者這塊避免數據丟失,都是用confirm機制的。
【2】消息隊列自身丟失——持久化處理
就是 RabbitMQ 自己弄丟了數據,這個你必須開啟 RabbitMQ 的持久化,就是消息寫入之后會持久化到磁盤,哪怕是 RabbitMQ 自己掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟。除非極其罕見的是,RabbitMQ 還沒持久化,自己就掛了,可能導致少量數據丟失,但是這個概率較小。
設置持久化有兩個步驟:
- 創建 queue 的時候將其設置為持久化
這樣就可以保證 RabbitMQ 持久化 queue 的元數據,但是不會持久化 queue 里的數據。 - 發送消息的時候將消息的
deliveryMode設置為 2
就是將消息設置為持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。
必須要同時設置這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會從磁盤上重啟恢復 queue,恢復這個 queue 里的數據。
持久化可以跟生產者那邊的confirm機制配合起來,只有消息被持久化到磁盤之后,才會通知生產者ack了,所以哪怕是在持久化到磁盤之前,RabbitMQ 掛了,數據丟了,生產者收不到ack,你也是可以自己重發的。
注意,哪怕是你給 RabbitMQ 開啟了持久化機制,也有一種可能,就是這個消息寫到了 RabbitMQ 中,但是還沒來得及持久化到磁盤上,結果不巧,此時 RabbitMQ 掛了,就會導致內存里的一點點數據丟失。
【3】消費者還未處理就宕機了
ack機制,簡單來說,就是你
關閉 RabbitMQ 的自動ack,可以
通過一個 api 來調用就行,然后每次你自己代碼里確保處理完的時候,再在程序里
ack一把。這樣的話,如果你還沒處理完,不就沒有
ack?那 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的。
KafKa(與RobbitMQ類似)
生產環境也遇到過,我們也是,之前 Kafka 的 leader 機器宕機了,將 follower 切換為 leader 之后(還未進行備份),就會發現說這個數據就丟了。
所以此時一般是要求起碼設置如下 4 個參數:
- 給 topic 設置
replication.factor參數:這個值必須大於 1,要求每個 partition 必須有至少 2 個副本。 - 在 Kafka 服務端設置
min.insync.replicas參數:這個值必須大於 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯系,沒掉隊,這樣才能確保 leader 掛了還有一個 follower 吧。 - 在 producer 端設置
acks=all:這個是要求每條數據,必須是寫入所有 replica 之后,才能認為是寫成功了。 - 在 producer 端設置
retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。
簡單來說就是要配置參數,使得每個partition至少有兩個副本、每個Leader至少感知一個follower和自己保持聯系、只有寫入所有replica副本才算寫入成功、一旦寫入失敗就無限重試。
我們生產環境就是按照上述要求配置的,這樣配置之后,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發生故障,進行 leader 切換時,數據不會丟失。
大家都知道 Kafka 會自動提交 offset,那么只要關閉自動提交 offset,在處理完之后自己手動提交 offset,就可以保證數據不會丟。
【3】生產者丟失
如果按照上述的思路設置了 acks=all,一定不會丟,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才認為本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。
四、如何保證消息的順序性
1. 失序場景
binlog,接着這三條
binlog 發送到 MQ 里面,到消費出來依次執行,起碼得保證人家是按照順序來的吧?不然本來是:增加、修改、刪除;你楞是換了順序給執行成刪除、修改、增加,不全錯了么。
- RabbitMQ:一個 queue,多個 consumer,這不明顯亂了;
- kafka:一個 topic,一個 partition,一個 consumer,內部多線程,這不也明顯亂了。
2.解決方案
RabbitMQ
拆分多個 queue,每個 queue 一個 consumer,就是多一些 queue 而已,確實是麻煩點;或者就一個 queue 但是對應一個 consumer,然后這個 consumer 內部用內存隊列做排隊,然后分發給底層不同的 worker 來處理。
kafka
一個 topic,一個 partition,一個 consumer,內部單線程消費;寫 N 個內存 queue,然后對於 N 個線程,每個線程分別消費一個內存 queue 即可。
五、如何解決消息隊列的延時以及過期失效問題?
假設你用的是 RabbitMQ,RabbtiMQ 是可以設置過期時間的,也就是 TTL。如果消息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在 mq 里,而是大量的數據會直接搞丟。
這個情況下,就不是說要增加 consumer 消費積壓的消息,因為實際上沒啥積壓,而是丟了大量的消息。我們可以采取一個方案,就是批量重導,這個我們之前線上也有類似的場景干過。就是大量積壓的時候,我們當時就直接丟棄數據了,然后等過了高峰期以后,比如大家一起喝咖啡熬夜到晚上12點以后,用戶都睡覺了。這個時候我們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,然后重新灌入 mq 里面去,把白天丟的數據給他補回來。也只能是這樣了。
假設 1 萬個訂單積壓在 mq 里面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程序把那 1000 個訂單給查出來,手動發到 mq 里去再補一次。
六、消息隊列滿了之后怎么處理?
七、有幾百萬消息持續擠壓幾小時怎么解決?
幾千萬條數據在 MQ 里積壓了七八個小時,從下午 4 點多,積壓到了晚上 11 點多。這個是我們真實遇到過的一個場景,確實是線上故障了,這個時候要不然就是修復 consumer 的問題,讓它恢復消費速度,然后傻傻的等待幾個小時消費完畢。這個肯定不能在面試的時候說吧。
一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鍾就是 18 萬條。所以如果你積壓了幾百萬到上千萬的數據,即使消費者恢復了,也需要大概 1 小時的時間才能恢復過來。
一般這個時候,只能臨時緊急擴容了,具體操作步驟和思路如下:
- 先修復 consumer 的問題,確保其恢復消費速度,然后將現有 cnosumer 都停掉。
- 新建一個 topic,partition 是原來的 10 倍,臨時建立好原先 10 倍的 queue 數量。
- 然后寫一個臨時的分發數據的 consumer 程序,這個程序部署上去消費積壓的數據,消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數量的 queue。
- 接着臨時征用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的數據。這種做法相當於是臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數據。
- 等快速消費完積壓數據之后,得恢復原先部署的架構,重新用原先的 consumer 機器來消費消息。
八、如果讓你設計一個MQ,如何考慮?
可伸縮性?
是否要持久化磁盤?
高可用性?
可靠性?
