轉自:https://blog.csdn.net/qq_30281559/article/details/93662737
1.什么是消息隊列?
可以看作是一個存放消息的容器,當我們需要使用消息的時候可以取出消息供自己使用。消息隊列是分布式系統中重要的組件,使用消息隊列主要是為了通過異步處理提高系統性能和削峰、降低系統耦合性。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,Kafka,RocketMQ。
通過提供 消息傳遞 和 消息排隊 模型,它可以在 分布式環境 下提供 應用解耦、彈性伸縮、冗余存儲、流量削峰、異步通信、數據同步 等等功能。
消息隊列容易和 Java 中的本地 MessageQueue 搞混,所以消息隊列更多被稱為消息中間件、分布式消息隊列等等。
2.為什么要使用消息隊列?
1 解耦
消息隊列使利用發布-訂閱模式工作,消息發送者(生產者)發布消息,一個或多個消息接受者(消費者)訂閱消息。消息發送者(生產者)和消息接受者(消費者)之間沒有直接耦合,消息發送者將消息發送至分布式消息隊列即結束對消息的處理,消息接受者從分布式消息隊列獲取該消息后進行后續處理,並不需要知道該消息從何而來。
消息接受者對消息進行過濾、處理、包裝后,構造成一個新的消息類型,將消息繼續發送出去,等待其他消息接受者訂閱該消息。因此基於事件(消息對象)驅動的業務架構可以是一系列流程。
另外為了避免消息隊列服務器宕機造成消息丟失,會將成功發送到消息隊列的消息存儲在消息生產者服務器上,等消息真正被消費者服務器處理后才刪除消息。在消息隊列服務器宕機后,生產者服務器會選擇分布式消息隊列服務器集群中的其他服務器發布消息。
不要認為消息隊列只能利用發布-訂閱模式工作,只不過在解耦這個特定業務環境下是使用發布-訂閱模式的。除了發布-訂閱模式,還有點對點訂閱模式(一個消息只有一個消費者),我們比較常用的是發布-訂閱模式。 另外,這兩種消息模型是 JMS 提供的,AMQP 協議還提供了 5 種消息模型。
2 異步處理、
傳統模式中一些非必要的業務邏輯以同步的方式運行,太耗費時間。使用消息隊列之后,用戶的請求數據發送給消息隊列之后立即 返回,再由消息隊列的消費者進程從消息隊列中獲取數據,異步寫入數據庫。由於消息隊列服務器處理速度快於數據庫(消息隊列也比數據庫有更好的伸縮性),因此響應速度得到大幅改善。
3 削峰
消息隊列具有很好的削峰作用——即通過異步處理,將短時間高並發產生的事務消息存儲在消息隊列中,從而削平高峰期的並發事務。
3.使用消息隊列帶來的一些問題
系統可用性降低: 在加入MQ之前,需要考慮消息丟失或者說MQ掛掉等等的情況。
系統復雜性提高: 加入MQ之后,你需要保證消息沒有被重復消費、處理消息丟失的情況、保證消息傳遞的順序性等等問題!
一致性問題: 消息隊列可以實現異步,消息隊列帶來的異步確實可以提高系統響應速度。但是,萬一消息的真正消費者並沒有正確消費消息怎么辦?這樣就會導致數據不一致的情況了!
4.JMS兩種消息模型
①點到點(P2P)模型
使用隊列(Queue)作為消息通信載體;滿足生產者與消費者模式,一條消息只能被一個消費者使用,未被消費的消息在隊列中保留直到被消費或超時。比如:我們生產者發送100條消息的話,兩個消費者來消費一般情況下兩個消費者會按照消息發送的順序各自消費一半(也就是你一個我一個的消費。)
② 發布/訂閱(Pub/Sub)模型
發布訂閱模型(Pub/Sub) 使用主題(Topic)作為消息通信載體,類似於廣播模式;發布者發布一條消息,該消息通過主題傳遞給所有的訂閱者,在一條消息廣播之后才訂閱的用戶則是收不到該條消息的。
5.消息隊列由哪些角色組成?
MQ 角色
-
生產者(Producer):負責產生消息。
-
消費者(Consumer):負責消費消息
-
消息代理(Message Broker):負責存儲消息和轉發消息兩件事情。其中,轉發消息分為推送和拉取兩種方式。
拉取(Pull),是指 Consumer 主動從 Message Broker 獲取消息
推送(Push),是指 Message Broker 主動將 Consumer 感興趣的消息推送給 Consumer 。 -
Topic
主題,發布訂閱模式下的消息統一匯集地,不同生產者向topic發送消息,由MQ服務器分發到不同的訂閱者,實現消息的廣播 -
Queue
隊列,PTP模式下,特定生產者向特定queue發送消息,消費者訂閱特定的queue完成指定消息的接收
6.常見消息中間件比較
(1)中小型軟件公司,建議選RabbitMQ.一方面,erlang語言天生具備高並發的特性,而且他的管理界面用起來十分方便。正所謂,成也蕭何,敗也蕭何!他的弊端也在這里,雖然RabbitMQ是開源的,然而國內有幾個能定制化開發erlang的程序員呢?所幸,RabbitMQ的社區十分活躍,可以解決開發過程中遇到的bug,這點對於中小型公司來說十分重要。不考慮rocketmq和kafka的原因是,一方面中小型軟件公司不如互聯網公司,數據量沒那么大,選消息中間件,應首選功能比較完備的,所以kafka排除。不考慮rocketmq的原因是,rocketmq是阿里出品,如果阿里放棄維護rocketmq,中小型公司一般抽不出人來進行rocketmq的定制化開發,因此不推薦。
(2)大型軟件公司,根據具體使用在rocketMq和kafka之間二選一。一方面,大型軟件公司,具備足夠的資金搭建分布式環境,也具備足夠大的數據量。針對rocketMQ,大型軟件公司也可以抽出人手對rocketMQ進行定制化開發,畢竟國內有能力改JAVA源碼的人,還是相當多的。至於kafka,根據業務場景選擇,如果有日志采集功能,肯定是首選kafka了。具體該選哪個,看使用場景。
7.如何保證消息隊列是高可用的?
RocketMQ
- Producer
1、Producer 自身在應用中,所以無需考慮高可用。
2、Producer 配置多個 Namesrv 列表,從而保證 Producer 和 Namesrv 的連接高可用。並且,會從 Namesrv 定時拉取最新的 Topic 信息。
3、Producer 會和所有 Consumer 直連,在發送消息時,會選擇一個 Broker 進行發送。如果發送失敗,則會使用另外一個 Broker 。
4、Producer 會定時向 Broker 心跳,證明其存活。而 Broker 會定時檢測,判斷是否有 Producer 異常下線。 - Consumer
1、Consumer 需要部署多個節點,以保證 Consumer 自身的高可用。當相同消費者分組中有新的 Consumer 上線,或者老的 Consumer 下線,會重新分配 Topic 的 Queue 到目前消費分組的 Consumer 們。
2、Consumer 配置多個 Namesrv 列表,從而保證 Consumer 和 Namesrv 的連接高可用。並且,會從 Consumer 定時拉取最新的 Topic 信息。
3、Consumer 會和所有 Consumer 直連,消費相應分配到的 Queue 的消息。如果消費失敗,則會發回消息到 Broker 中。
4、Consumer 會定時向 Broker 心跳,證明其存活。而 Broker 會定時檢測,判斷是否有 Consumer 異常下線。 - Namesrv
1、Namesrv 需要部署多個節點,以保證 Namesrv 的高可用。
2、Namesrv 本身是無狀態,不產生數據的存儲,是通過 Broker 心跳將 Topic 信息同步到 Namesrv 中。
3、多個 Namesrv 之間不會有數據的同步,是通過 Broker 向多個 Namesrv 多寫。 - Broker
1、多個 Broker 可以形成一個 Broker 分組。每個 Broker 分組存在一個 Master 和多個 Slave 節點。
Master 節點,可提供讀和寫功能。Slave 節點,可提供讀功能。
Master 節點會不斷發送新的 CommitLog 給 Slave節點。Slave 節點不斷上報本地的 CommitLog 已經同步到的位置給 Master 節點。
Slave 節點會從 Master 節點拉取消費進度、Topic 配置等等。
2、多個 Broker 分組,形成 Broker 集群。
Broker 集群和集群之間,不存在通信與數據同步。
3、Broker 可以配置同步刷盤或異步刷盤,根據消息的持久化的可靠性來配置。
kafka
- Zookeeper 部署 2N+1 節點,形成 Zookeeper 集群,保證高可用。
- Kafka Broker 部署集群。
每個 Topic 的 Partition ,基於【副本機制】,在 Broker 集群中復制,形成 replica 副本,保證消息存儲的可靠性。每個 replica 副本,都會選擇出一個 leader 分區(Partition),提供給客戶端(Producer 和 Consumer)進行讀寫(只能讀寫 leader)。 - Kafka Producer 無需考慮集群,因為和業務服務部署在一起。Producer 從 Zookeeper 拉取到 Topic 的元數據后,選擇對應的 Topic 的 leader 分區,進行消息發送寫入。而 Broker 根據 Producer 的 request.required.acks 配置,是寫入自己完成就響應給 Producer 成功,還是寫入所有 Broker 完成再響應。這個,就是胖友自己對消息的可靠性的選擇。
- Kafka Consumer 部署集群。每個 Consumer 分配其對應的 Topic Partition ,根據對應的分配策略。並且,Consumer 只從 leader 分區(Partition)拉取消息。另外,當有新的 Consumer 加入或者老的 Consumer 離開,都會將 Topic Partition 再均衡,重新分配給 Consumer 。
寫數據的時候,生產者就寫 leader,然后 leader 將數據落地寫本地磁盤,接着其他 follower 自己主動從 leader 來 pull 數據。一旦所有 follower 同步好數據了,就會發送 ack 給 leader,leader 收到所有 follower 的 ack 之后,就會返回寫成功的消息給生產者。(當然,這只是其中一種模式,還可以適當調整這個行為)
消費的時候,只會從 leader 去讀,但是只有當一個消息已經被所有 follower 都同步成功返回 ack 的時候,這個消息才會被消費者讀到。
8.如何保證消息不被重復消費(冪等性)
回答這個問題,首先你別聽到重復消息這個事兒,就一無所知吧,你先大概說一說可能會有哪些重復消費的問題。
首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能會出現消息重復消費的問題,正常。因為這問題通常不是 MQ 自己保證的,是由我們開發來保證的。挑一個 Kafka 來舉個例子,說說怎么重復消費吧。
Kafka 實際上有個 offset 的概念,就是每個消息寫進去,都有一個 offset,代表消息的序號,然后 consumer 消費了數據之后,每隔一段時間(定時定期),會把自己消費過的消息的 offset 提交一下,表示“我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的 offset 來繼續消費吧”。
但是凡事總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎么重啟了,如果碰到點着急的,直接 kill 進程了,再重啟。這會導致 consumer 有些消息處理了,但是沒來得及提交 offset,尷尬了。重啟之后,少數消息會再次消費一次
。
舉個栗子。
有這么個場景。數據 1/2/3 依次進入 kafka,kafka 會給這三條數據每條分配一個offset,代表這條數據的序號,我們就假設分配的 offset 依次是 152/153/154。消費者從 kafka去消費的時候,也是按照這個順序去消費。假如當消費者消費了offset=153
的這條數據,剛准備去提交 offset 到 zookeeper,此時消費者進程被重啟了。那么此時消費過的數據 1/2 的 offset 並沒有提交,kafka 也就不知道你已經消費了offset=153
這條數據。那么重啟之后,消費者會找 kafka說,嘿,哥兒們,你給我接着把上次我消費到的那個地方后面的數據繼續給我傳遞過來。由於之前的 offset 沒有提交成功,那么數據 1/2 會再次傳過來,如果此時消費者沒有去重的話,那么就會導致重復消費。
如果消費者干的事兒是拿一條數據就往數據庫里寫一條,會導致說,你可能就把數據 1/2 在數據庫里插入了 2 次,那么數據就錯啦。
其實重復消費不可怕,可怕的是你沒考慮到重復消費之后,怎么保證冪等性。
舉個例子吧。假設你有個系統,消費一條消息就往數據庫里插入一條數據,要是你一個消息重復兩次,你不就插入了兩條,這數據不就錯了?但是你要是消費到第二次的時候,自己判斷一下是否已經消費過了,若是就直接扔了,這樣不就保留了一條數據,從而保證了數據的正確性。
一條數據重復出現兩次,數據庫里就只有一條數據,這就保證了系統的冪等性。
冪等性,通俗點說,就一個數據,或者一個請求,給你重復來多次,你得確保對應的數據是不會改變的,不能出錯。
所以第二個問題來了,怎么保證消息隊列消費的冪等性?
其實還是得結合業務來思考,我這里給幾個思路:
每個消息都會有唯一的消息 id
1)、先查再保存
每次保存數據的時候,都先查一下,如果數據存在了那么就不保存。這個情況是並發不高的情況。
2)、業務表添加約束條件
如果你的數據庫將來都不會分庫分表,那么可以在業務表字段加上唯一約束條件(UNIQUE),這樣相同的數據就不會保存為多份。
3)、添加消息表
再數據庫里面,添加一張消息消費記錄表,表字段加上唯一約束條件(UNIQUE),消費完之后就往表里插入一條數據。因為加了唯一約束條件,第二次保存的時候,MySQL 就會報錯,就插入不進去;通過數據庫可以限制重復消費。
4)、使用 Redis
如果你的系統是分布式的,又做了分庫分表,那么可以使用 Redis 來做記錄,把消息 id 存在 Redis 里,下次再有重復消息 id 在消費的時候,如果發現 Redis 里面有了就不能進行消費。
5)、高並發下
如果你的系統並發很高,那么可以使用 Redis 或者 zookeeper 的分布式對消息 id 加鎖,然后使用上面的幾個方法進行冪等性控制。
9.如何保證生產者的發送消息的可靠性?
用 MQ 有個基本原則,就是數據不能多一條,也不能少一條,不能多,就是說的重復消費和冪等性問題。不能少,就是說這數據別搞丟了。
RabbitMQ
1、生產者弄丟了數據
生產者將數據發送到 RabbitMQ 的時候,可能數據就在半路給搞丟了,因為網絡問題啥的,都有可能。
此時可以選擇用 RabbitMQ 提供的事務功能,就是生產者發送數據之前開啟 RabbitMQ 事務channel.txSelect
,然后發送消息,如果消息沒有成功被 RabbitMQ 接收到,那么生產者會收到異常報錯,此時就可以回滾事務channel.txRollback
,然后重試發送消息;如果收到了消息,那么可以提交事務channel.txCommit
。
// 開啟事務 channel.txSelect try { // 這里發送消息 } catch (Exception e) { channel.txRollback // 這里再次重發這條消息 } // 提交事務 channel.txCommit
但是問題是,RabbitMQ 事務機制(同步)一搞,基本上吞吐量會下來,因為太耗性能。
所以一般來說,如果你要確保說寫 RabbitMQ 的消息別丟,可以開啟 confirm
模式,在生產者那里設置開啟 confirm
模式之后,你每次寫的消息都會分配一個唯一的 id,然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ack
消息,告訴你說這個消息 ok 了。如果 RabbitMQ 沒能處理這個消息,會回調你的一個 nack
接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在內存里維護每個消息 id 的狀態,如果超過一定時間還沒接收到這個消息的回調,那么你可以重發。
事務機制和 confirm
機制最大的不同在於,事務機制是同步的,你提交一個事務之后會阻塞在那兒,但是 confirm
機制是異步的,你發送個消息之后就可以發送下一個消息,然后那個消息 RabbitMQ 接收了之后會異步回調你的一個接口通知你這個消息接收到了。
所以一般在生產者這塊避免數據丟失,都是用 confirm
機制的。
2、RabbitMQ 弄丟了數據
就是 RabbitMQ 自己弄丟了數據,這個你必須開啟 RabbitMQ 的持久化,就是消息寫入之后會持久化到磁盤,哪怕是 RabbitMQ 自己掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟。除非極其罕見的是,RabbitMQ 還沒持久化,自己就掛了,可能導致少量數據丟失,但是這個概率較小。
設置持久化有兩個步驟:
- 創建 queue 的時候將其設置為持久化
這樣就可以保證 RabbitMQ 持久化 queue 的元數據,但是它是不會持久化 queue 里的數據的。 - 第二個是發送消息的時候將消息的
deliveryMode
設置為 2
就是將消息設置為持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。
必須要同時設置這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啟,也會從磁盤上重啟恢復 queue,恢復這個 queue 里的數據。
注意,哪怕是你給 RabbitMQ 開啟了持久化機制,也有一種可能,就是這個消息寫到了 RabbitMQ 中,但是還沒來得及持久化到磁盤上,結果不巧,此時 RabbitMQ 掛了,就會導致內存里的一點點數據丟失。
所以,持久化可以跟生產者那邊的 confirm
機制配合起來**,只有消息被持久化到磁盤之后,才會通知生產者** ack
了,所以哪怕是在持久化到磁盤之前,RabbitMQ 掛了,數據丟了,生產者收不到 ack
,你也是可以自己重發的。
3、消費端弄丟了數據
RabbitMQ 如果丟失了數據,主要是因為你消費的時候,剛消費到,還沒處理,結果進程掛了,比如重啟了,那么就尷尬了,RabbitMQ 認為你都消費了,這數據就丟了。
這個時候得用 RabbitMQ 提供的 ack
機制,簡單來說,就是你必須關閉 RabbitMQ 的自動 ack
,可以通過一個 api 來調用就行,然后每次你自己代碼里確保處理完的時候,再在程序里 ack
一把。這樣的話,如果你還沒處理完,不就沒有 ack
了?那 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的。
Kafka
1、消費端弄丟了數據
唯一可能導致消費者弄丟數據的情況,就是說,你消費到了這個消息,然后消費者那邊自動提交了 offset,讓 Kafka 以為你已經消費好了這個消息,但其實你才剛准備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。
這不是跟 RabbitMQ 差不多嗎,大家都知道 Kafka 會自動提交 offset,那么只要關閉自動提交 offset,在處理完之后自己手動提交 offset,就可以保證數據不會丟。但是此時確實還是可能會有重復消費,比如你剛處理完,還沒提交 offset,結果自己掛了,此時肯定會重復消費一次,自己保證冪等性就好了。
生產環境碰到的一個問題,就是說我們的 Kafka 消費者消費到了數據之后是寫到一個內存的 queue 里先緩沖一下,結果有的時候,你剛把消息寫入內存 queue,然后消費者會自動提交 offset。然后此時我們重啟了系統,就會導致內存 queue 里還沒來得及處理的數據就丟失了。
2、Kafka 弄丟了數據
這塊比較常見的一個場景,就是 Kafka 某個 broker 宕機,然后重新選舉 partition 的 leader。大家想想,要是此時其他的 follower 剛好還有些數據沒有同步,結果此時 leader 掛了,然后選舉某個 follower 成 leader 之后,不就少了一些數據?這就丟了一些數據啊。
生產環境也遇到過,我們也是,之前 Kafka 的 leader 機器宕機了,將 follower 切換為 leader 之后,就會發現說這個數據就丟了。
所以此時一般是要求起碼設置如下 4 個參數:
- 給 topic 設置
replication.factor
參數:這個值必須大於 1,要求每個 partition 必須有至少 2 個副本。 - 在 Kafka 服務端設置
min.insync.replicas
參數:這個值必須大於 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯系,沒掉隊,這樣才能確保 leader 掛了還有一個 follower 吧。 - 在 producer 端設置
acks=all
:這個是要求每條數據,必須是寫入所有 replica 之后,才能認為是寫成功了。 - 在 producer 端設置
retries=MAX
(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。
我們生產環境就是按照上述要求配置的,這樣配置之后,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發生故障,進行 leader 切換時,數據不會丟失。
3、生產者會不會弄丟數據?
如果按照上述的思路設置了 acks=all
,一定不會丟,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才認為本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。
如何保證消息的順序性?
先看看順序會錯亂的倆場景:
RabbitMQ:一個 queue,多個 consumer。比如,生產者向 RabbitMQ 里發送了三條數據,順序依次是 data1/data2/data3,壓入的是 RabbitMQ 的一個內存隊列。有三個消費者分別從 MQ 中消費這三條數據中的一條,結果消費者2先執行完操作,把 data2 存入數據庫,然后是 data1/data3。這不明顯亂了。
rabbitmq-order-01
Kafka:比如說我們建了一個 topic,有三個 partition。生產者在寫的時候,其實可以指定一個 key,比如說我們指定了某個訂單 id 作為 key,那么這個訂單相關的數據,一定會被分發到同一個 partition 中去,而且這個 partition 中的數據一定是有順序的。
消費者從 partition 中取出來數據的時候,也一定是有順序的。到這里,順序還是 ok 的,沒有錯亂。接着,我們在消費者里可能會搞多個線程來並發處理消息。因為如果消費者是單線程消費處理,而處理比較耗時的話,比如處理一條消息耗時幾十 ms,那么 1 秒鍾只能處理幾十條消息,這吞吐量太低了。而多個線程並發跑的話,順序可能就亂掉了。
kafka-order-01
解決方案
RabbitMQ
拆分多個 queue,每個 queue 一個 consumer,就是多一些 queue 而已,確實是麻煩點;或者就一個 queue 但是對應一個 consumer,然后這個 consumer 內部用內存隊列做排隊,然后分發給底層不同的 worker 來處理。
Kafka
一個 topic,一個 partition,一個 consumer,內部單線程消費,單線程吞吐量太低,一般不會用這個。
寫 N 個內存 queue,具有相同 key 的數據都到同一個內存 queue;然后對於 N 個線程,每個線程分別消費一個內存 queue 即可,這樣就能保證順序性。
如何解決消息過期的問題?
1.大量消息在 mq 里積壓了幾個小時了還沒解決
幾千萬條數據在 MQ 里積壓了七八個小時,從下午 4 點多,積壓到了晚上 11 點多。這個是我們真實遇到過的一個場景,確實是線上故障了,這個時候要不然就是修復 consumer 的問題,讓它恢復消費速度,然后傻傻的等待幾個小時消費完畢。這個肯定不能在面試的時候說吧。
一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鍾就是 18 萬條。所以如果你積壓了幾百萬到上千萬的數據,即使消費者恢復了,也需要大概 1 小時的時間才能恢復過來。
一般這個時候,只能臨時緊急擴容了,具體操作步驟和思路如下:
- 先修復 consumer 的問題,確保其恢復消費速度,然后將現有 consumer 都停掉。
- 新建一個 topic,partition 是原來的 10 倍,臨時建立好原先 10 倍的 queue 數量。
- 然后寫一個臨時的分發數據的 consumer 程序,這個程序部署上去消費積壓的數據,消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數量的 queue。
- 接着臨時征用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的數據。這種做法相當於是臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數據。
- 等快速消費完積壓數據之后,得恢復原先部署的架構,重新用原先的 consumer 機器來消費消息。
2.mq 中的消息過期失效了
假設你用的是 RabbitMQ,RabbtiMQ 是可以設置過期時間的,也就是 TTL。如果消息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在 mq 里,而是大量的數據會直接搞丟。
這個情況下,就不是說要增加 consumer 消費積壓的消息,因為實際上沒啥積壓,而是丟了大量的消息。我們可以采取一個方案,就是批量重導,這個我們之前線上也有類似的場景干過。就是大量積壓的時候,我們當時就直接丟棄數據了,然后等過了高峰期以后,比如大家一起喝咖啡熬夜到晚上12點以后,用戶都睡覺了。這個時候我們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,然后重新灌入 mq 里面去,把白天丟的數據給他補回來。也只能是這樣了。
假設 1 萬個訂單積壓在 mq 里面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程序把那 1000 個訂單給查出來,手動發到 mq 里去再補一次。
3.mq 都快寫滿了
如果消息積壓在 mq 里,你很長時間都沒有處理掉,此時導致 mq 都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。然后走第二個方案,到了晚上再補數據吧。
消息隊列的一般存儲方式有哪些?
當前業界幾款主流的MQ消息隊列采用的存儲方式主要有以下三種方式。
1. 分布式KV存儲
這類 MQ 一般會采用諸如 LevelDB 、RocksDB 和 Redis 來作為消息持久化的方式。由於分布式緩存的讀寫能力要優於 DB ,所以在對消息的讀寫能力要求都不是比較高的情況下,采用這種方式倒也不失為一種可以替代的設計方案。
消息存儲於分布式 KV 需要解決的問題在於如何保證 MQ 整體的可靠性。
2. 文件系統
目前業界較為常用的幾款產品(RocketMQ / Kafka / RabbitMQ)均采用的是消息刷盤至所部署虛擬機/物理機的文件系統來做持久化(刷盤一般可以分為異步刷盤和同步刷盤兩種模式)。
刷盤指的是存儲到硬盤。
消息刷盤為消息存儲提供了一種高效率、高可靠性和高性能的數據持久化方式。除非部署 MQ 機器本身或是本地磁盤掛了,否則一般是不會出現無法持久化的故障問題。
3. 關系型數據庫 DB
Apache下開源的另外一款MQ—ActiveMQ(默認采用的KahaDB做消息存儲)可選用 JDBC 的方式來做消息持久化,通過簡單的 XML 配置信息即可實現JDBC消息存儲。
由於,普通關系型數據庫(如 MySQL )在單表數據量達到千萬級別的情況下,其 IO 讀寫性能往往會出現瓶頸。因此,如果要選型或者自研一款性能強勁、吞吐量大、消息堆積能力突出的 MQ 消息隊列,那么並不推薦采用關系型數據庫作為消息持久化的方案。在可靠性方面,該種方案非常依賴 DB ,如果一旦 DB 出現故障,則 MQ 的消息就無法落盤存儲會導致線上故障。
小結
因此,綜合上所述從存儲效率來說,文件系統 > 分布式 KV 存儲 > 關系型數據庫 DB ,直接操作文件系統肯定是最快和最高效的,而關系型數據庫 TPS 一般相比於分布式 KV 系統會更低一些(簡略地說,關系型數據庫本身也是一個需要讀寫文件 Server ,這時 MQ 作為 Client與其建立連接並發送待持久化的消息數據,同時又需要依賴 DB 的事務等,這一系列操作都比較消耗性能),所以如果追求高效的IO讀寫,那么選擇操作文件系統會更加合適一些。但是如果從易於實現和快速集成來看,文件系統 > 分布式 KV 存儲 > 關系型數據庫 DB,但是性能會下降很多。
另外,從消息中間件的本身定義來考慮,應該盡量減少對於外部第三方中間件的依賴。一般來說依賴的外部系統越多,也會使得本身的設計越復雜,所以個人的理解是采用文件系統作為消息存儲的方式,更貼近消息中間件本身的定義。