RabbitMQ問題


目錄

1:什么場景使用了mq?直接掉接口不行嗎?

2:用消息隊列都有什么優點和缺點?

3:Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么區別?

4:那你們是如何保證消息隊列的高可用的?

5:如何保證消息不被重復消費啊?如何保證消費的時候是冪等的啊?

6:如何保證消息的可靠性傳輸啊?要是消息丟失了怎么辦啊?

7:那如何保證消息的順序性?

8:如何解決消息隊列的延時以及過期失效問題?消息隊列滿了以后該怎么處理?有幾百萬消息持續積壓幾小時,說說怎么解決?

9:如果讓你寫一個消息隊列,該如何進行架構設計啊?說一下你的思路。

 1:為什么使用MQ?

主要是:解耦、異步、削峰。

(1)解耦:A 系統發送數據到 BCD 三個系統,通過接口調用發送。如果 E 系統也要這個數據呢?那如果 C 系統現在不需要了呢?A 系統負責人幾乎崩潰......A 系統跟其它各種亂七八糟的系統嚴重耦合,A 系統產生一條比較關鍵的數據,很多系統都需要 A 系統將這個數據發送過來。如果使用 MQ,A 系統產生一條數據,發送到 MQ 里面去,哪個系統需要數據自己去 MQ 里面消費。如果新系統需要數據,直接從 MQ 里消費即可;如果某個系統不需要這條數據了,就取消對 MQ 消息的消費即可。這樣下來,A 系統壓根兒不需要去考慮要給誰發送數據,不需要維護這個代碼,也不需要考慮人家是否調用成功、失敗超時等情況。

就是一個系統或者一個模塊,調用了多個系統或者模塊,互相之間的調用很復雜,維護起來很麻煩。但是其實這個調用是不需要直接同步調用接口的,如果用 MQ 給它異步化解耦。

(2)異步:A 系統接收一個請求,需要在自己本地寫庫,還需要在 BCD 三個系統寫庫,自己本地寫庫要 3ms,BCD 三個系統分別寫庫要 300ms、450ms、200ms。最終請求總延時是 3 + 300 + 450 + 200 = 953ms,接近 1s,用戶感覺搞個什么東西,慢死了慢死了。用戶通過瀏覽器發起請求。如果使用 MQ,那么 A 系統連續發送 3 條消息到 MQ 隊列中,假如耗時 5ms,A 系統從接受一個請求到返回響應給用戶,總時長是 3 + 5 = 8ms。

(3)削峰:減少高峰時期對服務器壓力。

(4)日志處理

 

2:MQ優缺點

 

優點上面已經說了,就是在特殊場景下有其對應的好處,解耦、異步、削峰。

缺點有以下幾個:

系統可用性降低
系統引入的外部依賴越多,越容易掛掉。萬一 MQ 掛了,MQ 一掛,整套系統崩潰,你不就完了?

系統復雜度提高(分布式事務處理)
硬生生加個 MQ 進來,你怎么保證消息沒有重復消費?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性?問題一大堆。

一致性問題
A 系統處理完了直接返回成功了,人都以為你這個請求就成功了;但是問題是,要是 BCD 三個系統那里,BD 兩個系統寫庫成功了,結果 C 系統寫庫失敗了,咋整?你這數據就不一致了。

 

3:Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么區別?

對於吞吐量來說kafka和RocketMQ支撐高吞吐,ActiveMQ和RabbitMQ比他們低一個數量級。對於延遲量來說RabbitMQ是最低的。

1.從社區活躍度

按照目前網絡上的資料,RabbitMQ 、activeM 、ZeroMQ 三者中,綜合來看,RabbitMQ 是首選。 

 

2.持久化消息比較

ActiveMq 和RabbitMq 都支持。持久化消息主要是指我們機器在不可抗力因素等情況下掛掉了,消息不會丟失的機制。

 

3.綜合技術實現

可靠性、靈活的路由、集群、事務、高可用的隊列、消息排序、問題追蹤、可視化管理工具、插件系統等等。

RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。當然ZeroMq 也可以做到,不過自己必須手動寫代碼實現,代碼量不小。尤其是可靠性中的:持久性、投遞確認、發布者證實和高可用性。

 

4.高並發

毋庸置疑,RabbitMQ 最高,原因是它的實現語言是天生具備高並發高可用的erlang 語言。

 

5.比較關注的比較, RabbitMQ 和 Kafka

RabbitMq 比Kafka 成熟,在可用性上,穩定性上,可靠性上,  RabbitMq  勝於  Kafka  (理論上)。

另外,Kafka 的定位主要在日志等方面, 因為Kafka 設計的初衷就是處理日志的,可以看做是一個日志(消息)系統一個重要組件,針對性很強,所以 如果業務方面還是建議選擇 RabbitMq 。

還有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出來很多。

 

特性 ActiveMQ RabbitMQ RocketMQ Kafka
單機吞吐量 萬級,比 RocketMQ、Kafka 低一個數量級 同 ActiveMQ 10 萬級,支撐高吞吐 10 萬級,高吞吐,一般配合大數據類的系統來進行實時數據計算、日志采集等場景
topic 數量對吞吐量的影響     topic 可以達到幾百/幾千的級別,吞吐量會有較小幅度的下降,這是 RocketMQ 的一大優勢,在同等機器下,可以支撐大量的 topic topic 從幾十到幾百個時候,吞吐量會大幅度下降,在同等機器下,Kafka 盡量保證 topic 數量不要過多,如果要支撐大規模的 topic,需要增加更多的機器資源
時效性 ms 級 微秒級,這是 RabbitMQ 的一大特點,延遲最低 ms 級 延遲在 ms 級以內
可用性 高,基於主從架構實現高可用 同 ActiveMQ 非常高,分布式架構 非常高,分布式,一個數據多個副本,少數機器宕機,不會丟失數據,不會導致不可用
消息可靠性 有較低的概率丟失數據   經過參數優化配置,可以做到 0 丟失 同 RocketMQ
功能支持 MQ 領域的功能極其完備 基於 erlang 開發,並發能力很強,性能極好,延時很低 MQ 功能較為完善,還是分布式的,擴展性好 功能較為簡單,主要支持簡單的 MQ 功能,在大數據領域的實時計算以及日志采集被大規模使用
 

4:如何保證高可用的?

RabbitMQ 是比較有代表性的,因為是基於主從(非分布式)做高可用性的,我們就以 RabbitMQ 為例子講解第一種 MQ 的高可用性怎么實現。RabbitMQ 有三種模式:單機模式、普通集群模式、鏡像集群模式。

 

單機模式,就是 Demo 級別的,一般就是你本地啟動了玩玩兒的😄,沒人生產用單機模式

 

普通集群模式,意思就是在多台機器上啟動多個 RabbitMQ 實例,每個機器啟動一個。你創建的 queue,只會放在一個 RabbitMQ 實例上,但是每個實例都同步 queue 的元數據(元數據可以認為是 queue 的一些配置信息,通過元數據,可以找到 queue 所在實例)。你消費的時候,實際上如果連接到了另外一個實例,那么那個實例會從 queue 所在實例上拉取數據過來。這方案主要是提高吞吐量的,就是說讓集群中多個節點來服務某個 queue 的讀寫操作。

 

鏡像集群模式:這種模式,才是所謂的 RabbitMQ 的高可用模式。跟普通集群模式不一樣的是,在鏡像集群模式下,你創建的 queue,無論元數據還是 queue 里的消息都會存在於多個實例上,就是說,每個 RabbitMQ 節點都有這個 queue 的一個完整鏡像,包含 queue 的全部數據的意思。然后每次你寫消息到 queue 的時候,都會自動把消息同步到多個實例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一個策略,這個策略是鏡像集群模式的策略,指定的時候是可以要求數據同步到所有節點的,也可以要求同步到指定數量的節點,再次創建 queue 的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。這樣的話,好處在於,你任何一個機器宕機了,沒事兒,其它機器(節點)還包含了這個 queue 的完整數據,別的 consumer 都可以到其它節點上去消費數據。壞處在於,第一,這個性能開銷也太大了吧,消息需要同步到所有機器上,導致網絡帶寬壓力和消耗很重!RabbitMQ 一個 queue 的數據都是放在一個節點里的,鏡像集群下,也是每個節點都放這個 queue 的完整數據。

 

Kafka 一個最基本的架構認識:由多個 broker 組成,每個 broker 是一個節點;你創建一個 topic,這個 topic 可以划分為多個 partition,每個 partition 可以存在於不同的 broker 上,每個 partition 就放一部分數據。這就是天然的分布式消息隊列,就是說一個 topic 的數據,是分散放在多個機器上的,每個機器就放一部分數據。Kafka 0.8 以后,提供了 HA 機制,就是 replica(復制品) 副本機制。每個 partition 的數據都會同步到其它機器上,形成自己的多個 replica 副本。所有 replica 會選舉一個 leader 出來,那么生產和消費都跟這個 leader 打交道,然后其他 replica 就是 follower。寫的時候,leader 會負責把數據同步到所有 follower 上去,讀的時候就直接讀 leader 上的數據即可。只能讀寫 leader?很簡單,要是你可以隨意讀寫每個 follower,那么就要 care 數據一致性的問題,系統復雜度太高,很容易出問題。Kafka 會均勻地將一個 partition 的所有 replica 分布在不同的機器上,這樣才可以提高容錯性。因為如果某個 broker 宕機了,沒事兒,那個 broker上面的 partition 在其他機器上都有副本的,如果這上面有某個 partition 的 leader,那么此時會從 follower 中重新選舉一個新的 leader 出來,大家繼續讀寫那個新的 leader 即可。這就有所謂的高可用性了。寫數據的時候,生產者就寫 leader,然后 leader 將數據落地寫本地磁盤,接着其他 follower 自己主動從 leader 來 pull 數據。一旦所有 follower 同步好數據了,就會發送 ack 給 leader,leader 收到所有 follower 的 ack 之后,就會返回寫成功的消息給生產者。(當然,這只是其中一種模式,還可以適當調整這個行為)消費的時候,只會從 leader 去讀,但是只有當一個消息已經被所有 follower 都同步成功返回 ack 的時候,這個消息才會被消費者讀到。

5:如何保證消息不被重復消費?或者說,如何保證消息消費時的冪等性?

假設你有個系統,消費一條消息就往數據庫里插入一條數據,要是你一個消息重復兩次,你不就插入了兩條,這數據不就錯了?但是你要是消費到第二次的時候,自己判斷一下是否已經消費過了,若是就直接扔了,這樣不就保留了一條數據,從而保證了數據的正確性。

 

在什么場景會出現消息重復消費?比如說消費端已經消費了 offset=2,offset=3,offset=4 的三條數據,正准備把這個 offset 的值傳給 kafka,這時候消費端機器宕機了,這個數據沒傳過去;重啟之后,消費端同步 kafka,kafka 那邊消費的記錄 offset 還是 1,那么 kafka 會認為之前的 2、3、4 都沒有消費過,會把這幾個數據在傳給消費端;這樣消費端這邊就重復對這幾條數據進行消費了。在數據庫里面可能就多了很多重復的數據。
像其他的 MQ,也是一樣,消費端再返回給 MQ 的時候,當機了或者重啟了,那么都會出現重復消費的問題。

問題解決:

冪等性:一個請求,不管重復來多少次,結果是不會改變的。

每個消息都會有唯一的消息 id。
1)、先查再保存
每次保存數據的時候,都先查一下,如果數據存在了那么就不保存。這個情況是並發不高的情況。

2)、業務表添加約束條件
如果你的數據庫將來都不會分庫分表,那么可以在業務表字段加上唯一約束條件(UNIQUE),這樣相同的數據就不會保存為多份。

3)、添加消息表
再數據庫里面,添加一張消息消費記錄表,表字段加上唯一約束條件(UNIQUE),消費完之后就往表里插入一條數據。因為加了唯一約束條件,第二次保存的時候,mysql 就會報錯,就插入不進去;通過數據庫可以限制重復消費。

4)、使用 redis
如果你的系統是分布式的,又做了分庫分表,那么可以使用 redis 來做記錄,把消息 id 存在 redis 里,下次再有重復消息 id 在消費的時候,如果發現 redis 里面有了就不能進行消費。(可以用redis的set來保證是否被消費過了)

5)、高並發下
如果你的系統並發很高,那么可以使用 redis 或者 zookeeper 的分布式對消息 id 加鎖,然后使用上面的幾個方法進行冪等性控制。

6:如何保證消息的可靠性傳輸啊?要是消息丟失了怎么辦啊?

數據的丟失問題,可能出現在生產者、MQ、消費者中

生產者丟失:生產者將數據發送到 RabbitMQ 的時候,可能數據就在半路給搞丟了,因為網絡問題啥的,都有可能。此時可以選擇用 RabbitMQ 提供的事務功能,就是生產者發送數據之前開啟 RabbitMQ 事務channel.txSelect,然后發送消息,如果消息沒有成功被 RabbitMQ 接收到,那么生產者會收到異常報錯,此時就可以回滾事務channel.txRollback,然后重試發送消息;如果收到了消息,那么可以提交事務channel.txCommit。吞吐量會下來,因為太耗性能。所以一般來說,如果你要確保說寫 RabbitMQ 的消息別丟,可以開啟confirm模式,在生產者那里設置開啟confirm模式之后,你每次寫的消息都會分配一個唯一的 id,然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個ack消息,告訴你說這個消息 ok 了。如果 RabbitMQ 沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在內存里維護每個消息 id 的狀態,如果超過一定時間還沒接收到這個消息的回調,那么你可以重發。事務機制和cnofirm機制最大的不同在於,事務機制是同步的,你提交一個事務之后會阻塞在那兒,但是confirm機制是異步的,你發送個消息之后就可以發送下一個消息,然后那個消息RabbitMQ 接收了之后會異步回調你一個接口通知你這個消息接收到了。所以一般在生產者這塊避免數據丟失,都是用confirm機制的。

MQ中丟失:就是 RabbitMQ 自己弄丟了數據,這個你必須開啟 RabbitMQ 的持久化,就是消息寫入之后會持久化到磁盤,哪怕是 RabbitMQ 自己掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟。設置持久化有兩個步驟:創建 queue 的時候將其設置為持久化,這樣就可以保證 RabbitMQ 持久化 queue 的元數據,但是不會持久化 queue 里的數據。第二個是發送消息的時候將消息的 deliveryMode 設置為 2,就是將消息設置為持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。必須要同時設置這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會從磁盤上重啟恢復 queue,恢復這個 queue 里的數據。持久化可以跟生產者那邊的confirm機制配合起來,只有消息被持久化到磁盤之后,才會通知生產者ack了,所以哪怕是在持久化到磁盤之前,RabbitMQ 掛了,數據丟了,生產者收不到ack,你也是可以自己重發的。注意,哪怕是你給 RabbitMQ 開啟了持久化機制,也有一種可能,就是這個消息寫到了 RabbitMQ 中,但是還沒來得及持久化到磁盤上,結果不巧,此時 RabbitMQ 掛了,就會導致內存里的一點點數據丟失。

消費端丟失:你消費的時候,剛消費到,還沒處理,結果進程掛了,比如重啟了,那么就尷尬了,RabbitMQ 認為你都消費了,這數據就丟了。這個時候得用 RabbitMQ 提供的ack機制,簡單來說,就是你關閉 RabbitMQ 的自動ack,可以通過一個 api 來調用就行,然后每次你自己代碼里確保處理完的時候,再在程序里ack一把。這樣的話,如果你還沒處理完,不就沒有ack?那 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的。

 

 

7:如何保證消息順序性

先看看順序會錯亂的場景:RabbitMQ:一個 queue,多個 consumer,這不明顯亂了;

總體解決:

在每個消息被創建時,都將被賦予一個全局唯一的,單調遞增的,連續的序列號(SN)。可以通過一個全局計數器來實現這一點,通過比較兩個消息的SN,確定其先后順序。

解決:rabbitmq保證數據的順序性:

1.多個消費者來消費同一個隊列導致的消息順序不一致:

如果存在多個消費者, 那么就讓每個消費者對應一個queue,然后把要發送 的數據(可能需要順序的一組消息,比如一條數據的新建,刪除)全都放到一個queue,這樣就能保證所有的數據只到達一個消費者從而保證每個數據到達數據庫都是順序的。
rabbitmq:拆分多個queue,每個queue一個consumer,就是多一些queue而已,確實是麻煩點;或者就一個queue但是對應一個consumer,然后這個consumer內部用內存隊列做排隊,然后分發給底層不同的worker來處理。
 

上面的解決方案可能導致某些隊列不堪重負,其他隊列為空。

所以我們還是可能把一條數據的消息平均放到所有的消息隊列里面。

比如消息M1的SN=100,消息M2的SN=110。為了保證消息M1在M2之前被處理,處理M2之前需要檢查M1是否已經被處理了。

方案1:在將消息M1 SN=100加入到消息隊列A的時候,對其他每個消息隊列加入一個特殊的消息(Block Message BM)。當某個消息處理BM的時候,檢查M1是否被處理了,如果被處理就繼續處理后面的消息,如果沒有,就繼續等待,直到M1被處理。(但是可能導致大量的隊列block,所以會造成很大的性能浪費)

方案2:其他每個消息隊列記錄自己處理最后一個消息的SN,被稱為Last SN,LSN。假設消息M1的SN為100,消息M2的SN=110.當處理M2時,檢查其他消息隊列的LSN是否大於100:

如果都大於100,說明M1已經被處理了。

如果有一個或多個消息隊列LSN小於100,表示M1還沒有被處理。block所有LSN大於100的隊列,等待LSN小於100的隊列繼續執行,直到他們的LSN大於100,或者隊列為空。

 

2.對於同一個consumer,我們在消費端可能會使用多線程來處理導致的消息不按順序

因為單線程的處理速度慢,為了加快處理時間和吞吐量,會使用 thread 來處理。在消費端加入線程之后,就會出現順序不一致的情況。

 可以在同一個consumer選擇多個內存隊列,每一個線程去消費同一個內存隊列的內容。在consumer端,我們可以通過hash進行分布。

kafka保證數據的順序性

參考:https://juejin.im/post/5c9b1c155188251d806727b2

 

8:如何解決消息隊列的延時以及過期失效問題?消息隊列滿了以后該怎么處理?有幾百萬消息持續積壓幾小時,說說怎么解決?

消息積壓處理辦法:臨時緊急擴容

1.先修復 consumer 的問題,確保其恢復消費速度,然后將現有 consumer 都停掉。
2.新建一個 topic,partition 是原來的 10 倍,臨時建立好原先 10 倍或者20倍的 queue 數量。
3.然后寫一個臨時的分發數據的 consumer 程序,這個程序部署上去消費積壓的數據,消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數量的 queue。
4.接着臨時征用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的數據。這種做法相當於是臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數據。
5.等快速消費完積壓數據之后,得恢復原先部署的架構,重新用原先的 consumer 機器來消費消息。

MQ中消息失效:假設你用的是 RabbitMQ,RabbtiMQ 是可以設置過期時間的,也就是 TTL。如果消息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在 mq 里,而是大量的數據會直接搞丟。

我們可以采取一個方案,就是批量重導,這個我們之前線上也有類似的場景干過。就是大量積壓的時候,我們當時就直接丟棄數據了,然后等過了高峰期以后,比如大家一起喝咖啡熬夜到晚上12點以后,用戶都睡覺了。這個時候我們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,然后重新灌入 mq 里面去,把白天丟的數據給他補回來。也只能是這樣了。假設 1 萬個訂單積壓在 mq 里面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程序把那 1000 個訂單給查出來,手動發到 mq 里去再補一次。

 

MQ消息隊列塊滿了:如果消息積壓在 mq 里,你很長時間都沒有處理掉,此時導致 mq 都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。然后走第二個方案,到了晚上再補數據吧。

9:設計MQ思路

比如說這個消息隊列系統,我們從以下幾個角度來考慮一下:

首先這個 mq 得支持可伸縮性吧,就是需要的時候快速擴容,就可以增加吞吐量和容量,那怎么搞?設計個分布式的系統唄,參照一下 kafka 的設計理念,broker -> topic -> partition,每個 partition 放一個機器,就存一部分數據。如果現在資源不夠了,簡單啊,給 topic 增加 partition,然后做數據遷移,增加機器,不就可以存放更多數據,提供更高的吞吐量了?

其次你得考慮一下這個 mq 的數據要不要落地磁盤吧?那肯定要了,落磁盤才能保證別進程掛了數據就丟了。那落磁盤的時候怎么落啊?順序寫,這樣就沒有磁盤隨機讀寫的尋址開銷,磁盤順序讀寫的性能是很高的,這就是 kafka 的思路。

其次你考慮一下你的 mq 的可用性啊?這個事兒,具體參考之前可用性那個環節講解的 kafka 的高可用保障機制。多副本 -> leader & follower -> broker 掛了重新選舉 leader 即可對外服務。

能不能支持數據 0 丟失啊?可以的,參考我們之前說的那個 kafka 數據零丟失方案。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM