一、什么是消息隊列?
消息隊列,是分布式系統中重要的組件。
- 主要解決應用耦合,異步消息,流量削峰等問題。
- 可實現高性能,高可用,可伸縮和最終一致性架構,是大型分布式系統中不可缺少的中間件。
目前主流的消息隊列有:
- Kafka
- RabbitMq
- RocketMq,老版本是MetaQ
- ActiveMq,目前用的人是越來越少了。
另外,消息隊列容易和java中的本地MessageQuene搞混,所以消息隊列更多的被稱為消息中間件或者分布式消息隊列等。
二、消息隊列由哪些角色組成?
- 生產者(Producer):負責生產消息
- 消費者(Consumer):負責消費消息
- 消息代理(Message Broker):負責存儲消息和轉發消息兩件事情。其中,轉發消息分為推送和拉去兩種方式。
- 拉取(Pull),是指Consumer主動從Message Borker獲取消息
- 推送(Push),是指Message Broker主動將Consumer感興趣的消息推送給Consumer。
三、消息隊列有哪些使用場景?
一般來說,有四大應用場景:
- 應用解耦
- 異步處理
- 流量削峰
- 消息通訊
- 日志處理
其中,應用解耦、異步處理是比較核心的。
四、為什么使用消息隊列進行應用解耦?
傳統模式下,如下圖所示:
- 缺點比較明顯,系統間耦合性太強,系統A在代碼中直接調用系統B和系統C的代碼,如果將來D系統接入,系統A還需要修改代碼,過於麻煩!並且萬一系統A、B、C改接口,還要持續跟進。
引入消息隊列,
- 將消息寫入消息隊列,需要消息的系統自己從消息隊列中訂閱,從而系統A不需要做任何修改。所以,有了消息隊列之后,從主動調用的方式,變成了消息的訂閱發布(或者說,事件的發布和監聽),從而解耦。舉個實際場景的例子,用戶支付訂單完成后,系統需要給用戶發紅包,增加積分等行為,就可以通過這樣的方式進行解耦。
五、為什么使用消息隊列進行異步處理?
- A系統需要串行逐個同步調用系統B、C、D。這其中會有很多問題。
- 如果每個系統調用執行是200ms,那么這個邏輯就要執行600ms。非常慢
- 如果任意一個系統調用異常報錯,那個整個邏輯就報錯了
- 如果任意一個系統調用超時了,那么整個邏輯就超時了。
- 。。。。。
引入消息隊列后,如下圖所示:
- 通過發送3條MQ消息,通過Consumer消費,從而異步,並行調用系統B、C、D
- 因為發送Mq消息是比較快的,假設每個操作2ms,那個這個邏輯只要執行6ms,非常快。
- 當然了,可能發送MQ消息會失敗。當然,這個是會存在的。此時可以異步重試,當然,可能異步重試的過程中,JVM進程掛了,此時又需要其他的機制來保證,不過,相比串行逐個同步調用系統B、C、D來說,出錯的幾率會低很多。
六、為什么使用消息隊列進行流量削峰?
對於大多數系統,一定會有訪問量的波峰和波谷。比較明顯的,就是我們經常使用的美團外賣,又或者被人詬病的小米秒殺。
如果在並發量大的時間,所有請求直接打到數據庫,造成數據庫直接掛掉
通過將請求先轉發到消息隊列匯總,然后,系統A慢慢的按照數據庫能處理的並發量,從消息隊列中逐步拉去消息進行消費。在生產中,這個暫時的高峰期積壓是允許的。
相對來說,消息隊列的性能會比數據庫性能更好,並且,橫向拓展能力更強。
七、為什么使用消息隊列進行消息通信?
消息通訊是指:消息隊列一般都內置了高效的通信機制,因此也可以 用在純的消息通訊。
- IM 聊天。
- 點對點消息隊列。可能大家會比較懵逼,有基於消息隊列的 RPC 框架實現,例如 rabbitmq-jsonrpc ,雖然現在用的人比較少。
- 面向物聯網的 MQTT 。阿里在開源的 RocketMQ 基礎上,增加了 MQTT 協議的支持,可見消息隊列 for IoT
八、如何使用消息隊列進行日志處理?
日志處理:是指將消息隊列應用在日志處理中,比如kafka的應用,解決大量日志傳輸的問題。
- 日志采集客戶端,負責日志數據采集,定時批量寫入kafka隊列。
- kafka消息隊列,負責日志數據的接收、存儲和轉發
- 日志處理應用:訂閱並消費kafka隊列中的日志數據
大家最熟悉的就是ELK+Kafka日志方案,如下:
- kafka:接收用戶日志的消息隊列。
- Logstash:對接kafka寫入的日志,做日志解析,統一成Json輸出給 Elasticsearch 中。
-
Elasticsearch :實時日志分析服務的核心技術,一個 schemaless ,實時的數據存儲服務,通過 index 組織數據,兼具強大的搜索和統計功能。
- Kibna:基於Elasticsearch的數據可視化組件,超強的數據可視化能力是眾多公司選擇 ELK stack 的重要原因。
九、消息隊列有什么優缺點?
任何中間件的引入,帶來優點的時候,也同時會帶來缺點。
- 系統可用性降低。
- 系統復雜度提高。
- 一致性問題。
- Producer發消息給message Broker階段
-
- Producer發消息給message Broker時,不要求Messaage Broker對接收的消息響應確認,Producer也不用關心Message Broker是否收到消息了。
- Message Broker存儲/轉發階段。
- 對Message Broker的存儲不要求持久性
- 轉發消息時,也不關心Consumer是否真的收到了。
- Consumer消費階段
- Consumer從Message Broker中獲取到消息后,可以從Message Broker刪除消息。
- 或者Consumer Broker在消息被Consumer拿去消費時刪除消息,不用關心Consumer最后對消息的消費情況如何。
2.消息至少被消費一次
適合不能容忍丟消息,允許重復消費的任務。
- Producer發送消息到Message Broker階段
- Producer發消息給Message Broker,Message Broker必須響應對消息的確認。
- Message Broker存儲/轉發階段
- Message Broker必須提供持久性的保障
- 轉發消息時,Message Broker需要Consumer通知刪除消息,才能將消息刪除。
- Consumer消費階段
- Consumer從Message Broker中獲取消息,必須在消費完成后,Message Broker上的消息才能被刪除。
3.消息僅被消費一次
適合對消費消費情況要求非常高的任務,實現較為復雜。
在這里需要考慮一個問題,就是這里的“僅被消費一次”指的是如下哪種情況:
- Message Broker上存儲的消息被Consumer僅消費一次。
- Producer發送消息到Message Broker階段
- Producer發消息給Message Broker時,不要求Message Broker對接收到的消息響應確認。Producer也不關心Message Broker是否收到消息了。
- Message Broker存儲/轉發階段
- Message Broker必須提供持久性保障
- 並且,每條消息在其消費隊列里有唯一標識(這個唯一標識可以由Producer產生,也可以由Message Broker產生)
- Consumer消費階段
- Consumer從Message Broker中獲取到消息后,需要記錄下消費的消費標識,以便在后續消費中防止對某個消息重復消費(比如Consumer獲取到消息,消費完后,還沒有來得及從Messaage Broker刪除消息九掛了,這樣Message Broker如果把消息重新加入待消費隊列的話,那么這條消息就會被重復消費了。)
- Producer發送消息到Message Broker階段
- Producer上產生的消息僅被Consumer僅消費一次。
- Producer發送消息到Message Broker階段
- Producer發消息到Message Broker時,Message Broker必須響應對消息的確認,並且Producer負責為該消息產生一個標識,以防止Consumer重復消費(因為Producer發消息給Message Broker后,由於網絡問題沒收到Message Broker的響應,可能會重發消息給高Message Broker)。
- Message Broker存儲/轉發階段
- Message Broker必須提供持久性保障
- 並且,每條消息在其消息隊列中有唯一標識(這一個標識需要由Producer產生)
- Consumer消費階段
- Consumer僅被消費一次。
- Producer發送消息到Message Broker階段
雖然3種方式看起來比較復雜,但是我們會發現,是層層遞進,越來越可靠。
實際生產場景下,我們是傾向第三種的第二種情況,每條消息從Producer保證被送達,並且被Consumer僅消費一次。當然,重心還是如何保證Consumer僅消費一次。雖然說,消息產生的唯一標識可以在框架級去做排重,但也是最穩妥的,還是業務層也保證消費的冪等性。
十一、消息隊列有幾種投遞方式?分別有什么缺點?
消息隊列有兩種投遞方式:push推送和pull拉取
push:
- 優點:就是及時性
- 缺點:就是受限於消費者的消費能力,可能造成消息的堆積,Broker會不斷的給消費者發送不能處理的消息。
pull:
- 優點:就是主動權掌握在消費者,可以根據自己的消費速度進行消息拉取。
- 缺點:就是消費方不知道什么時候可以獲取最新的消息,會有消息延遲和忙等。
目前的消息隊列,基於push+pull模式結合的方式,Broker僅僅告訴Consumer有新的消息,具體的消息拉取,還需要Consumer自己主動拉取。
十二、如何保證消費者的消息的冪等性?
如果要到達消費者的消息消息的冪等性,就需要消息僅被消費一次,且每一條消息從Producer保證被送達,並且被Consumer僅消費一次。
對於Producer來說:
- 可能因為網絡問題,Producer重試多次發送消息,實際第一次就發送成功,那么就會產生多條相同的消息。
對於Consumer來說:
- 可能因為Broker的消息進度丟失,導致消息重復投遞給Consumer。
- Consumer消費成功,但是因為JVM異常崩潰,導致消息的消費進度未及時同步給Consumer。
如何解決?
所以,上述的種種情況,都可能導致消費者會獲取到重復的消息,那么我們的死來就無法是解決不發送、投遞重復的消息,而是消費者在消費時,如何保證冪等性。
消費者實現冪等性,有兩種方式:
1.框架層統一封裝
2.業務層自己實現。
- 1.框架層統一封裝
首先需要有一個消息排重的唯一標識,該編號由Producer生成,例如uuid,或者其他唯一編號的算法。
然后就需要一個排重的存儲器,例如說:
-
使用關系數據庫,增加一個排重表,使用消息編號作為唯一主鍵。
- 使用 KV 數據庫,KEY 存儲消息編號,VALUE 任一。此處,暫時不考慮 KV 數據庫持久化的問題
- 在消息消費執行業務邏輯之前,插入這條排重記錄。但是,此時會有可能 JVM 異常崩潰。那么 JVM 重啟后,這條消息就無法被消費了。因為,已經存在這條排重記錄。
- 在消息消費執行業務邏輯之后,插入這條排重記錄。
- 如果業務邏輯執行失敗,顯然,我們不能插入這條排重記錄,因為我們后續要消費重試。
-
如果業務邏輯執行成功,此時,我們可以插入這條排重記錄。但是,萬一插入這條排重記錄失敗呢?那么,需要讓插入記錄和業務邏輯在同一個事務當中,此時,我們只能使用數據庫。
-
業務層自己實現
- 先查詢數據庫,判斷數據是否已經被更新過。如果是,則直接返回消費完成,否則執行消費。
- 更新數據庫時,帶上數據的狀態。如果更新失敗,則直接返回消費完成,否則執行消費。
- …
ActiveMq
一般的業務系統要引入Mq,最早大家都用ActiveMQ,但是現在確實大家用的不多了,沒經過大規模吞吐量場景的驗證,社區也不是很活躍
RabbitMQ
后來大家開始用RabbitMq,但是確實Erlang語言阻止了大量的java工程師去深入研究和掌控它,對公司幾乎處於不可控的狀態,但是確實人家是開源的,比較穩定的支持,社區活躍度也高,另外,因為Spring Cloud在消息隊列上的支持,對RabbitMq是比較不錯的。所以在選型上又更加被推崇。