rabbitMQ常見問題


1. 如何確保消息正確地發送至RabbitMQ?

RabbitMQ使用發送方確認模式,確保消息正確地發送到RabbitMQ。

發送方確認模式:將信道設置成confirm模式(發送方確認模式),則所有在信道上發布的消息都會被指派一個唯一的ID。一旦消息被投遞到目的隊列后,或者消息被寫入磁盤后(可持久化的消息),信道會發送一個確認給生產者(包含消息唯一ID)。如果RabbitMQ發生內部錯誤從而導致消息丟失,會發送一條nack(not acknowledged,未確認)消息。

發送方確認模式是異步的,生產者應用程序在等待確認的同時,可以繼續發送消息。當確認消息到達生產者應用程序,生產者應用程序的回調方法就會被觸發來處理確認消息。

channel.confirm_delivery()

2. 如何確保消息接收方消費了消息?

接收方消息確認機制:消費者接收每一條消息后都必須進行確認(消息接收和消息確認是兩個不同操作)。只有消費者確認了消息,RabbitMQ才能安全地把消息從隊列中刪除。

這里並沒有用到超時機制,RabbitMQ僅通過Consumer的連接中斷來確認是否需要重新發送消息。也就是說,只要連接不中斷,RabbitMQ給了Consumer足夠長的時間來處理消息。

下面羅列幾種特殊情況:

  • 如果消費者接收到消息,在確認之前斷開了連接或取消訂閱,RabbitMQ會認為消息沒有被分發,然后重新分發給下一個訂閱的消費者。(可能存在消息重復消費的隱患,需要根據bizId去重)
  • 如果消費者接收到消息卻沒有確認消息,連接也未斷開,則RabbitMQ認為該消費者繁忙,將不會給該消費者分發更多的消息。

3. 如何避免消息重復投遞或重復消費?

在消息生產時,MQ內部針對每條生產者發送的消息生成一個inner-msg-id,作為去重和冪等的依據(消息投遞失敗並重傳),避免重復的消息進入隊列;

在消息消費時,要求消息體中必須要有一個bizId(對於同一業務全局唯一,如支付ID、訂單ID、帖子ID等)作為去重和冪等的依據,避免同一條消息被重復消費。

 

4. 消息基於什么傳輸?

由於TCP連接的創建和銷毀開銷較大,且並發數受系統資源限制,會造成性能瓶頸。RabbitMQ使用信道的方式來傳輸數據。信道是建立在真實的TCP連接內的虛擬連接,且每條TCP連接上的信道數量沒有限制。

5. 消息如何分發?

若該隊列至少有一個消費者訂閱,消息將以循環(round-robin)的方式發送給消費者。每條消息只會分發給一個訂閱的消費者(前提是消費者能夠正常處理消息並進行確認)。

6. 消息怎么路由?

從概念上來說,消息路由必須有三部分:交換器、路由、綁定。生產者把消息發布到交換器上;綁定決定了消息如何從路由器路由到特定的隊列;消息最終到達隊列,並被消費者接收。

  1. 消息發布到交換器時,消息將擁有一個路由鍵(routing key),在消息創建時設定。
  2. 通過隊列路由鍵,可以把隊列綁定到交換器上。
  3. 消息到達交換器后,RabbitMQ會將消息的路由鍵與隊列的路由鍵進行匹配(針對不同的交換器有不同的路由規則)。如果能夠匹配到隊列,則消息會投遞到相應隊列中;如果不能匹配到任何隊列,消息將進入 “黑洞”。

常用的交換器主要分為一下三種:

  • direct:如果路由鍵完全匹配,消息就被投遞到相應的隊列
  • fanout:如果交換器收到消息,將會廣播到所有綁定的隊列上
  • topic:可以使來自不同源頭的消息能夠到達同一個隊列。 使用topic交換器時,可以使用通配符,比如:“*” 匹配特定位置的任意文本, “.” 把路由鍵分為了幾部分,“#” 匹配所有規則等。特別注意:發往topic交換器的消息不能隨意的設置選擇鍵(routing_key),必須是由"."隔開的一系列的標識符組成。

7. 如何確保消息不丟失?

消息持久化的前提是:將交換器/隊列的durable屬性設置為true,表示交換器/隊列是持久交換器/隊列,在服務器崩潰或重啟之后不需要重新創建交換器/隊列(交換器/隊列會自動創建)。

 

如果消息想要從Rabbit崩潰中恢復,那么消息必須:

  • 在消息發布前,通過把它的 “投遞模式” 選項設置為2(持久)來把消息標記成持久化
  • 將消息發送到持久交換器
  • 消息到達持久隊列

RabbitMQ確保持久性消息能從服務器重啟中恢復的方式是,將它們寫入磁盤上的一個持久化日志文件,當發布一條持久性消息到持久交換器上時,Rabbit會在消息提交到日志文件后才發送響應(如果消息路由到了非持久隊列,它會自動從持久化日志中移除)。一旦消費者從持久隊列中消費了一條持久化消息,RabbitMQ會在持久化日志中把這條消息標記為等待垃圾收集。如果持久化消息在被消費之前RabbitMQ重啟,那么Rabbit會自動重建交換器和隊列(以及綁定),並重播持久化日志文件中的消息到合適的隊列或者交換器上。

8. 使用RabbitMQ有什么好處?

  • 應用解耦(系統拆分)
  • 異步處理(預約掛號業務處理成功后,異步發送短信、推送消息、日志記錄等)
  • 消息分發
  • 流量削峰
  • 消息緩沖
  • ......

 9. 其他

RabbitMQ是 消息投遞服務,在應用程序和服務器之間扮演路由器的角色,而應用程序或服務器可以發送和接收包裹。其通信方式是一種 “發后即忘(fire-and-forget)” 的單向方式。

其中消息包含兩部分內容:有效載荷(payload)和標簽(label)

有效載荷是需要傳輸的數據,可以是任意內容。

標簽描述了有效載荷,RabbitMQ會根據標簽的描述,把消息發送給感興趣的接收方。
 
項目中的MQ:
#rabbitmq
spring.rabbitmq.host=127.0.0.1  主機
spring.rabbitmq.port=5672   端口
spring.rabbitmq.username=guest   用戶名
spring.rabbitmq.password=guest  密碼
spring.rabbitmq.virtual-host=/
#\u6D88\u8D39\u8005\u6570\u91CF
spring.rabbitmq.listener.simple.concurrency= 10   消費者的數量 出隊
spring.rabbitmq.listener.simple.max-concurrency= 10
#\u6D88\u8D39\u8005\u6BCF\u6B21\u4ECE\u961F\u5217\u83B7\u53D6\u7684\u6D88\u606F\u6570\u91CF
spring.rabbitmq.listener.simple.prefetch= 1      每次從隊列中取1個
#\u6D88\u8D39\u8005\u81EA\u52A8\u542F\u52A8
spring.rabbitmq.listener.simple.auto-startup=true
#\u6D88\u8D39\u5931\u8D25\uFF0C\u81EA\u52A8\u91CD\u65B0\u5165\u961F
spring.rabbitmq.listener.simple.default-requeue-rejected= true   消費失敗后是否重新入隊
#\u542F\u7528\u53D1\u9001\u91CD\u8BD5
spring.rabbitmq.template.retry.enabled=true    
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0
Rabbitmq 的隊列容量可以認為是無限的,根據內存有關。 可以設置隊列最大長度,當達到長度的時候,最先入隊的消息將被丟棄。

流量削峰一般在秒殺活動中應用廣泛
場景:秒殺活動,一般會因為流量過大,導致應用掛掉,為了解決這個問題,一般在應用前端加入消息隊列。
作用:
1.可以控制活動人數,超過此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒有成功過呢^^) ,先顯示一個排隊中,后端在處理,可能成功可能失敗。
2.可以緩解短時間的高流量壓垮應用(應用程序按自己的最大處理能力獲取訂單)

 
10.為什么使用Rabbit mq?
  • 1.Rabbit mq 是一個高級消息隊列,在分布式的場景下,擁有高性能,對負載均衡也有很好的支持。
  • 2.擁有持久化的機制,進程消息,隊列中的信息也可以保存下來。
  • 3.實現消費者和生產者之間的解耦。
  • 4.對於高並發場景下,利用消息隊列可以使得同步訪問變為串行訪問達到一定量的限流,利於數據庫的操作。
  • 5.可以使用消息隊列達到異步下單的效果,排隊中,后台進行邏輯下單。
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在

11.rabbitMQ的優點(適用范圍)
1. 基於erlang語言開發具有高可用高並發的優點,適合集群服務器。
2. 健壯、穩定、易用、跨平台、支持多種語言、文檔齊全。
3. 有消息確認機制和持久化機制,可靠性高。
4. 開源
其他MQ的優勢:
1. Apache ActiveMQ曝光率最高,但是可能會丟消息。
2. ZeroMQ延遲很低、支持靈活拓撲,但是不支持消息持久化和崩潰恢復。
其他消息隊列,Kafka 定位是日志消息隊列。吞吐量最大。
相比阿里的Rocket MQ ,Rabbit MQ 是可靠性更強,對數據一致性、穩定性和可靠性要求很高的場景。
劣勢是Rabbit mQ 的性能,吞吐量不高。 
 
12.消息通信
常見的web服務之間的通信機制有兩種 ,同步和異步。
同步方法有RMI、Hessin、Burlap、HTTP invoker,雖然同步通信比較簡單,但是存在如下問題:服務需要等待,耦合度高!
而異步通信就不存在這些問題,它無需等待,web服務只要將消息發送后就可以馬上繼續執行;對象對象和解耦;位置獨立,消息發起者只需要知道 消息服務器的位置就可以發送消息,消息接收也無需知道發起者的具體位置,它只需要知道消息服務器在哪里就能獲取消息。
常見的異步消中間件有kafaka、RabbitMQ、ZeroMQ、ActiveMQ,他們之間各有優勢,該用哪一種需要看實際需求而定。本文只介紹RabbitMQ的一些知識。

13.AMQP模型:
1.Server(broker): 接受客戶端連接,實現AMQP消息隊列和路由功能的進程,可以理解為郵局。
2.Virtual Host:其實是一個虛擬概念,類似於權限控制組,一個Virtual Host里面可以有若干個Exchange和Queue,當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以划分出多個vhost,每個用戶在自己的vhost創建exchange/queue等,就好比於tomcat中webapps目錄下可以部署多個web項目。
3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列,就好比郵遞員。
4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息,就好比於郵箱。
5.Message: 由Header和Body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先級是多少等,就好比於郵箱里面的信件。而Body是真正需要傳輸的APP數據,就像信件里面的信紙。
6.Binding:Binding聯系了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding后會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,兩者的匹配方式由Exchange Type決定,就好比於郵件上面的地址。
7.Connection:連接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP連接。
8.Channel:信道,僅僅創建了客戶端到Broker之間的連接后,客戶端還是不能發送消息的。需要為每一個Connection創建Channel,AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連接的建立和釋放都是十分昂貴的,如果一個客戶端每一個線程都需要與Broker交互,如果每一個線程都建立一個TCP連接,暫且不考慮TCP連接是否浪費,就算操作系統也無法承受每秒建立如此多的TCP連接,可以簡單的理解為線程池中的一個個線程。


14.消息確認(Confirm)機制
RabbitMQ的消息確認機制是為了確保消息發送者知道自己發布的消息被正確接收,如果沒有收到確認時就會認為消息發送過程發送了錯誤,此時就會馬上采取措施,以保證消息能正確送達(類似於HTTP的建立連接時的確認答復)。
具體做法如下:
當RabbitMQ發送消息以后,如果收到消息確認,才將該消息從Quque中移除。如果RabbitMQ沒有收到確認,如果檢測到消費者的RabbitMQ鏈接斷開,則RabbitMQ 會將該消息發送給其他消費者;否則就會重新再次發送一個消息給消費者。
持久化
RabbitMQ的一大特性就是支持消息持久化。但是Rabbit MQ默認是不持久隊列、Exchange、Binding以及隊列中的消息的,這意味着一旦消息服務器重啟,所有已聲明的隊列,Exchange,Binding以及隊列中的消息都會丟失,這是因為支持持久化會對性能造成較大的影響。

15.什么時候需要持久化?

1.我們根據自己的需求對它們進行持久化(具體方法可以參考官方的API)。
注意:消息是存在隊列里的,如果要使得消息能持久化,就必須先使隊列持久化。
2.內存緊張時,需要將部分內存中的消息轉移到磁盤中。

16.消息如何刷到磁盤?
1.寫入文件前會有一個Buffer,大小為1M,數據在寫入文件時,首先會寫入到這個Buffer,如果Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤)。
2.有個固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每個25ms,Buffer里的數據及未刷新到磁盤的文件內容必定會刷到磁盤。
3.每次消息寫入后,如果沒有后續寫入請求,則會直接將已寫入的消息刷到磁盤:使用Erlang的receive x after 0實現,只要進程的信箱里沒有消息,則產生一個timeout消息,而timeout會觸發刷盤操作。

17. RPC
RabbitMQ中也支持RPC,具體實現如下:
1.客戶端發送請求(消息)時,在消息的屬性中設置兩個值replyTo(用於告訴服務器處理完成后將通知我的消息發送到這個Queue中)和correlationId(此次請求的標識號,服務器處理完成后需要將此屬性返還,客戶端將根據這個id了解哪條請求被成功執行了或執行失敗)
2.服務器端收到消息並處理
3.服務器端處理完消息后,將生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性
4.客戶端之前已訂閱replyTo指定的Queue,從中收到服務器的應答消息后,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行后續業務處理


18.為什么使用消息隊列啊?消息隊列有什么優點和缺點啊?kafka、activemq、rabbitmq、rocketmq都有什么區別以及適合哪些場景?

1.為什么使用消息隊列啊?

通用回答是:我們公司有個什么業務場景,這個業務場景有個什么技術挑戰,如果不用MQ可能會很麻煩,但是你現在用了MQ之后帶給了你很多的好處。

比較核心的有3個業務場景:解耦、異步、削峰

解耦:現場畫個圖來說明一下,A系統發送個數據到BCD三個系統,接口調用發送,那如果E系統也要這個數據呢?那如果C系統現在不需要了呢?現在A系統又要發送第二種數據了呢?A系統負責人瀕臨崩潰中。。。再來點更加崩潰的事兒,A系統要時時刻刻考慮BCDE四個系統如果掛了咋辦?我要不要重發?我要不要把消息存起來?頭發都白了啊。。。

不用MQ的系統耦合場景:

 

使用了MQ之后的解耦場景:

 

異步:現場畫個圖來說明一下,A系統接收一個請求,需要在自己本地寫庫,還需要在BCD三個系統寫庫,自己本地寫庫要3ms,BCD三個系統分別寫庫要300ms、450ms、200ms。最終請求總延時是3 + 300 + 450 + 200 = 953ms,接近1s,用戶感覺搞個什么東西,慢死了慢死了。

不用MQ的同步高延時請求場景:

 

 

使用了MQ進行異步之后的接口性能優化:

 

 

削峰:每天0點到11點,A系統風平浪靜,每秒並發請求數量就100個。結果每次一到11點~1點,每秒並發請求數量突然會暴增到1萬條。但是系統最大的處理能力就只能是每秒鍾處理1000個請求啊。。。尷尬了,系統會死。。。

沒用MQ高峰期系統被打死的場景:

 


使用MQ來進行削峰的場景:

 


(2)消息隊列有什么優點和缺點啊?

優點上面已經說了,就是在特殊場景下有其對應的好處,解耦、異步、削峰

缺點呢?顯而易見的

系統可用性降低:系統引入的外部依賴越多,越容易掛掉,本來你就是A系統調用BCD三個系統的接口就好了,本來ABCD四個系統好好的,沒啥問題,你偏加個MQ進來,萬一MQ掛了咋整?MQ掛了,整套系統崩潰了,你不就完了么。

系統復雜性提高:硬生生加個MQ進來,你怎么保證消息沒有重復消費?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性?頭大頭大,問題一大堆,痛苦不已

一致性問題:A系統處理完了直接返回成功了,人都以為你這個請求就成功了;但是問題是,要是BCD三個系統那里,BD兩個系統寫庫成功了,結果C系統寫庫失敗了,咋整?你這數據就不一致了。

所以消息隊列實際是一種非常復雜的架構,你引入它有很多好處,但是也得針對它帶來的壞處做各種額外的技術方案和架構來規避掉,最好之后,你會發現,媽呀,系統復雜度提升了一個數量級,也許是復雜了10倍。但是關鍵時刻,用,還是得用的。。。

 

(3)kafka、activemq、rabbitmq、rocketmq都有什么優點和缺點啊?

 

 

 優劣勢總結:

         ActiveMQ:  

非常成熟,功能強大,在業內大量的公司以及項目中都有應用

偶爾會有較低概率丟失消息

 而且現在社區以及國內應用都越來越少,官方社區現在對ActiveMQ 5.x維護越來越少,幾個月才發布一個版本

 而且確實主要是基於解耦和異步來用的,較少在大規模吞吐的場景中使用

     RabbitMQ:

erlang語言開發,性能極其好,延時很低;

吞吐量到萬級,MQ功能比較完備

 而且開源提供的管理界面非常棒,用起來很好用

 社區相對比較活躍,幾乎每個月都發布幾個版本分

 在國內一些互聯網公司近幾年用rabbitmq也比較多一些

 但是問題也是顯而易見的,RabbitMQ確實吞吐量會低一些,這是因為他做的實現機制比較重。

 而且erlang開發,國內有幾個公司有實力做erlang源碼級別的研究和定制?如果說你沒這個實力的話,確實偶爾會有一些問題,你很難去看懂源碼,你公司對這個東西的掌控很弱,基本職能依賴於開源社區的快速維護和修復bug。

 而且rabbitmq集群動態擴展會很麻煩,不過這個我覺得還好。其實主要是erlang語言本身帶來的問題。很難讀源碼,很難定制和掌控。

     RocketMQ:

 接口簡單易用,而且畢竟在阿里大規模應用過,有阿里品牌保障

 日處理消息上百億之多,可以做到大規模吞吐,性能也非常好,分布式擴展也很方便,社區維護還可以,可靠性和可用性都是ok的,還可以支撐大規模的topic數量,支持復雜MQ業務場景

 而且一個很大的優勢在於,阿里出品都是java系的,我們可以自己閱讀源碼,定制自己公司的MQ,可以掌控

 社區活躍度相對較為一般,不過也還可以,文檔相對來說簡單一些,然后接口這塊不是按照標准JMS規范走的有些系統要遷移需要修改大量代碼

 還有就是阿里出台的技術,你得做好這個技術萬一被拋棄,社區黃掉的風險,那如果你們公司有技術實力我覺得用RocketMQ挺好的

        kafka:

 kafka的特點其實很明顯,就是僅僅提供較少的核心功能,但是提供超高的吞吐量,ms級的延遲,極高的可用性以及可靠性,而且分布式可以任意擴展

 同時kafka最好是支撐較少的topic數量即可,保證其超高吞吐量

 而且kafka唯一的一點劣勢是有可能消息重復消費,那么對數據准確性會造成極其輕微的影響,在大數據領域中以及日志采集中,這點輕微影響可以忽略

 這個特性天然適合大數據實時計算以及日志收集

 
綜上所述,各種對比之后,我個人傾向於是:

一般的業務系統要引入MQ,最早大家都用ActiveMQ,但是現在確實大家用的不多了,沒經過大規模吞吐量場景的驗證,社區也不是很活躍,所以大家還是算了吧,我個人不推薦用這個了;

后來大家開始用RabbitMQ,但是確實erlang語言阻止了大量的java工程師去深入研究和掌控他,對公司而言,幾乎處於不可控的狀態,但是確實人是開源的,比較穩定的支持,活躍度也高;

不過現在確實越來越多的公司,會去用RocketMQ,確實很不錯,但是我提醒一下自己想好社區萬一突然黃掉的風險,對自己公司技術實力有絕對自信的,我推薦用RocketMQ,否則回去老老實實用RabbitMQ吧,人是活躍開源社區,絕對不會黃

所以中小型公司,技術實力較為一般,技術挑戰不是特別高,用RabbitMQ是不錯的選擇;大型公司,基礎架構研發實力較強,用RocketMQ是很好的選擇

如果是大數據領域的實時計算、日志采集等場景,用Kafka是業內標准的,絕對沒問題,社區活躍度很高,絕對不會黃,何況幾乎是全世界這個領域的事實性規范

 

1、面試題

如何保證消息的可靠性傳輸(如何處理消息丟失的問題)?(基本也是必考的吧)

我們從下面幾個方面來分析

1)生產者弄丟了數據

生產者將數據發送到rabbitmq的時候,可能數據就在半路給搞丟了,因為網絡啥的問題,都有可能。

此時可以選擇用rabbitmq提供的事務功能,就是生產者發送數據之前開啟rabbitmq事務(channel.txSelect),然后發送消息,如果消息沒有成功被rabbitmq接收到,那么生產者會收到異常報錯,此時就可以回滾事務(channel.txRollback),然后重試發送消息;如果收到了消息,那么可以提交事務(channel.txCommit)。但是問題是,rabbitmq事務機制一搞,基本上吞吐量會下來,因為太耗性能。

所以一般來說,如果你要確保說寫rabbitmq的消息別丟,可以開啟confirm模式,在生產者那里設置開啟confirm模式之后,你每次寫的消息都會分配一個唯一的id,然后如果寫入了rabbitmq中,rabbitmq會給你回傳一個ack消息,告訴你說這個消息ok了。如果rabbitmq沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在內存里維護每個消息id的狀態,如果超過一定時間還沒接收到這個消息的回調,那么你可以重發。

事務機制和cnofirm機制最大的不同在於,事務機制是同步的,你提交一個事務之后會阻塞在那兒,但是confirm機制是異步的,你發送個消息之后就可以發送下一個消息,然后那個消息rabbitmq接收了之后會異步回調你一個接口通知你這個消息接收到了。

所以一般在生產者這塊避免數據丟失,都是用confirm機制的。

 

2)rabbitmq弄丟了數據

就是rabbitmq自己弄丟了數據,這個你必須開啟rabbitmq的持久化,就是消息寫入之后會持久化到磁盤,哪怕是rabbitmq自己掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟。除非極其罕見的是,rabbitmq還沒持久化,自己就掛了,可能導致少量數據會丟失的,但是這個概率較小。

設置持久化有兩個步驟,第一個是創建queue的時候將其設置為持久化的,這樣就可以保證rabbitmq持久化queue的元數據,但是不會持久化queue里的數據;第二個是發送消息的時候將消息的deliveryMode設置為2,就是將消息設置為持久化的,此時rabbitmq就會將消息持久化到磁盤上去。必須要同時設置這兩個持久化才行,rabbitmq哪怕是掛了,再次重啟,也會從磁盤上重啟恢復queue,恢復這個queue里的數據。

而且持久化可以跟生產者那邊的confirm機制配合起來,只有消息被持久化到磁盤之后,才會通知生產者ack了,所以哪怕是在持久化到磁盤之前,rabbitmq掛了,數據丟了,生產者收不到ack,你也是可以自己重發的。

哪怕是你給rabbitmq開啟了持久化機制,也有一種可能,就是這個消息寫到了rabbitmq中,但是還沒來得及持久化到磁盤上,結果不巧,此時rabbitmq掛了,就會導致內存里的一點點數據會丟失。

 

3)消費端弄丟了數據

rabbitmq如果丟失了數據,主要是因為你消費的時候,剛消費到,還沒處理,結果進程掛了,比如重啟了,那么就尷尬了,rabbitmq認為你都消費了,這數據就丟了。

這個時候得用rabbitmq提供的ack機制,簡單來說,就是你關閉rabbitmq自動ack,可以通過一個api來調用就行,然后每次你自己代碼里確保處理完的時候,再程序里ack一把。這樣的話,如果你還沒處理完,不就沒有ack?那rabbitmq就認為你還沒處理完,這個時候rabbitmq會把這個消費分配給別的consumer去處理,消息是不會丟的。

(2)kafka

1)消費端弄丟了數據

唯一可能導致消費者弄丟數據的情況,就是說,你那個消費到了這個消息,然后消費者那邊自動提交了offset,讓kafka以為你已經消費好了這個消息,其實你剛准備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。

這不是一樣么,大家都知道kafka會自動提交offset,那么只要關閉自動提交offset,在處理完之后自己手動提交offset,就可以保證數據不會丟。但是此時確實還是會重復消費,比如你剛處理完,還沒提交offset,結果自己掛了,此時肯定會重復消費一次,自己保證冪等性就好了。

 

生產環境碰到的一個問題,就是說我們的kafka消費者消費到了數據之后是寫到一個內存的queue里先緩沖一下,結果有的時候,你剛把消息寫入內存queue,然后消費者會自動提交offset。

 

然后此時我們重啟了系統,就會導致內存queue里還沒來得及處理的數據就丟失了

 

2)kafka弄丟了數據

 

 

這塊比較常見的一個場景,就是kafka某個broker宕機,然后重新選舉partiton的leader時。大家想想,要是此時其他的follower剛好還有些數據沒有同步,結果此時leader掛了,然后選舉某個follower成leader之后,他不就少了一些數據?這就丟了一些數據啊。

生產環境也遇到過,我們也是,之前kafka的leader機器宕機了,將follower切換為leader之后,就會發現說這個數據就丟了

所以此時一般是要求起碼設置如下4個參數:

給這個topic設置replication.factor參數:這個值必須大於1,要求每個partition必須有至少2個副本

在kafka服務端設置min.insync.replicas參數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯系,沒掉隊,這樣才能確保leader掛了還有一個follower吧

在producer端設置acks=all:這個是要求每條數據,必須是寫入所有replica之后,才能認為是寫成功了

在producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了

我們生產環境就是按照上述要求配置的,這樣配置之后,至少在kafka broker端就可以保證在leader所在broker發生故障,進行leader切換時,數據不會丟失

 

3)生產者會不會弄丟數據

如果按照上述的思路設置了ack=all,一定不會丟,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認為本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。

 

問題:        如何保證消息隊列的高可用啊?

 

RabbitMQ是比較有代表性的,因為是基於主從做高可用性的,我們就以他為例子講解第一種MQ的高可用性怎么實現。

 rabbitmq有三種模式:單機模式,普通集群模式,鏡像集群模式

 1)單機模式

 就是demo級別的,一般就是你本地啟動了玩玩兒的,沒人生產用單機模式

 

2)普通集群模式

 意思就是在多台機器上啟動多個rabbitmq實例,每個機器啟動一個。但是你創建的queue,只會放在一個rabbtimq實例上,但是每個實例都同步queue的元數據。完了你消費的時候,實際上如果連接到了另外一個實例,那么那個實例會從queue所在實例上拉取數據過來。

 這種方式確實很麻煩,也不怎么好,沒做到所謂的分布式,就是個普通集群。因為這導致你要么消費者每次隨機連接一個實例然后拉取數據,要么固定連接那個queue所在實例消費數據,前者有數據拉取的開銷,后者導致單實例性能瓶頸。

 而且如果那個放queue的實例宕機了,會導致接下來其他實例就無法從那個實例拉取,如果你開啟了消息持久化,讓rabbitmq落地存儲消息的話,消息不一定會丟,得等這個實例恢復了,然后才可以繼續從這個queue拉取數據。

 所以這個事兒就比較尷尬了,這就沒有什么所謂的高可用性可言了,這方案主要是提高吞吐量的,就是說讓集群中多個節點來服務某個queue的讀寫操作。

 

 

3)鏡像集群模式

 這種模式,才是所謂的rabbitmq的高可用模式,跟普通集群模式不一樣的是,你創建的queue,無論元數據還是queue里的消息都會存在於多個實例上,然后每次你寫消息到queue的時候,都會自動把消息到多個實例的queue里進行消息同步。

 這樣的話,好處在於,你任何一個機器宕機了,沒事兒,別的機器都可以用。壞處在於,第一,這個性能開銷也太大了吧,消息同步所有機器,導致網絡帶寬壓力和消耗很重!第二,這么玩兒,就沒有擴展性可言了,如果某個queue負載很重,你加機器,新增的機器也包含了這個queue的所有數據,並沒有辦法線性擴展你的queue

 那么怎么開啟這個鏡像集群模式呢?我這里簡單說一下,避免面試人家問你你不知道,其實很簡單rabbitmq有很好的管理控制台,就是在后台新增一個策略,這個策略是鏡像集群模式的策略,指定的時候可以要求數據同步到所有節點的,也可以要求就同步到指定數量的節點,然后你再次創建queue的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。

 

(2)kafka的高可用性

 

kafka一個最基本的架構認識:多個broker組成,每個broker是一個節點;你創建一個topic,這個topic可以划分為多個partition,每個partition可以存在於不同的broker上,每個partition就放一部分數據。

 這就是天然的分布式消息隊列,就是說一個topic的數據,是分散放在多個機器上的,每個機器就放一部分數據。

 實際上rabbitmq之類的,並不是分布式消息隊列,他就是傳統的消息隊列,只不過提供了一些集群、HA的機制而已,因為無論怎么玩兒,rabbitmq一個queue的數據都是放在一個節點里的,鏡像集群下,也是每個節點都放這個queue的完整數據。

 kafka 0.8以前,是沒有HA機制的,就是任何一個broker宕機了,那個broker上的partition就廢了,沒法寫也沒法讀,沒有什么高可用性可言。

 kafka 0.8以后,提供了HA機制,就是replica副本機制。每個partition的數據都會同步到吉他機器上,形成自己的多個replica副本。然后所有replica會選舉一個leader出來,那么生產和消費都跟這個leader打交道,然后其他replica就是follower。寫的時候,leader會負責把數據同步到所有follower上去,讀的時候就直接讀leader上數據即可。只能讀寫leader?很簡單,要是你可以隨意讀寫每個follower,那么就要care數據一致性的問題,系統復雜度太高,很容易出問題。kafka會均勻的將一個partition的所有replica分布在不同的機器上,這樣才可以提高容錯性。

 這么搞,就有所謂的高可用性了,因為如果某個broker宕機了,沒事兒,那個broker上面的partition在其他機器上都有副本的,如果這上面有某個partition的leader,那么此時會重新選舉一個新的leader出來,大家繼續讀寫那個新的leader即可。這就有所謂的高可用性了。

 寫數據的時候,生產者就寫leader,然后leader將數據落地寫本地磁盤,接着其他follower自己主動從leader來pull數據。一旦所有follower同步好數據了,就會發送ack給leader,leader收到所有follower的ack之后,就會返回寫成功的消息給生產者。(當然,這只是其中一種模式,還可以適當調整這個行為)

 消費的時候,只會從leader去讀,但是只有一個消息已經被所有follower都同步成功返回ack的時候,這個消息才會被消費者讀到。

 

 

 

面試題

如何保證消息不被重復消費啊(如何保證消息消費時的冪等性)?

 

2、面試官心里分析

 

其實這個很常見的一個問題,這倆問題基本可以連起來問。既然是消費消息,那肯定要考慮考慮會不會重復消費?能不能避免重復消費?或者重復消費了也別造成系統異常可以嗎?這個是MQ領域的基本問題,其實本質上還是問你使用消息隊列如何保證冪等性,這個是你架構里要考慮的一個問題。

面試官問你,肯定是必問的,這是你要考慮的實際生產上的系統設計問題。

 

3、面試題剖析

 

回答這個問題,首先你別聽到重復消息這個事兒,就一無所知吧,你先大概說一說可能會有哪些重復消費的問題。

首先就是比如rabbitmq、rocketmq、kafka,都有可能會出現消費重復消費的問題,正常。因為這問題通常不是mq自己保證的,是給你保證的。然后我們挑一個kafka來舉個例子,說說怎么重復消費吧。

 

 

kafka實際上有個offset的概念,就是每個消息寫進去,都有一個offset,代表他的序號,然后consumer消費了數據之后,每隔一段時間,會把自己消費過的消息的offset提交一下,代表我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的offset來繼續消費吧。

但是凡事總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎么重啟了,如果碰到點着急的,直接kill進程了,再重啟。這會導致consumer有些消息處理了,但是沒來得及提交offset,尷尬了。重啟之后,少數消息會再次消費一次。

其實重復消費不可怕,可怕的是你沒考慮到重復消費之后,怎么保證冪等性。

給你舉個例子吧。假設你有個系統,消費一條往數據庫里插入一條,要是你一個消息重復兩次,你不就插入了兩條,這數據不就錯了?但是你要是消費到第二次的時候,自己判斷一下已經消費過了,直接扔了,不就保留了一條數據?

一條數據重復出現兩次,數據庫里就只有一條數據,這就保證了系統的冪等性

冪等性,我通俗點說,就一個數據,或者一個請求,給你重復來多次,你得確保對應的數據是不會改變的,不能出錯。

那所以第二個問題來了,怎么保證消息隊列消費的冪等性?

 

 

其實還是得結合業務來思考,我這里給幾個思路:

(1)比如你拿個數據要寫庫,你先根據主鍵查一下,如果這數據都有了,你就別插入了,update一下好吧

(2)比如你是寫redis,那沒問題了,反正每次都是set,天然冪等性

(3)比如你不是上面兩個場景,那做的稍微復雜一點,你需要讓生產者發送每條數據的時候,里面加一個全局唯一的id,類似訂單id之類的東西,然后你這里消費到了之后,先根據這個id去比如redis里查一下,之前消費過嗎?如果沒有消費過,你就處理,然后這個id寫redis。如果消費過了,那你就別處理了,保證別重復處理相同的消息即可。

還有比如基於數據庫的唯一鍵來保證重復數據不會重復插入多條,我們之前線上系統就有這個問題,就是拿到數據的時候,每次重啟可能會有重復,因為kafka消費者還沒來得及提交offset,重復數據拿到了以后我們插入的時候,因為有唯一鍵約束了,所以重復數據只會插入報錯,不會導致數據庫中出現臟數據

 如何保證MQ的消費是冪等性的,需要結合具體的業務來看

 

如何保證消息的順序性?

2、面試官心里分析

其實這個也是用MQ的時候必問的話題,第一看看你了解不了解順序這個事兒?第二看看你有沒有辦法保證消息是有順序的?這個生產系統中常見的問題。

3、面試題剖析

我舉個例子,我們以前做過一個mysql binlog同步的系統,壓力還是非常大的,日同步數據要達到上億。mysql -> mysql,常見的一點在於說大數據team,就需要同步一個mysql庫過來,對公司的業務系統的數據做各種復雜的操作。

你在mysql里增刪改一條數據,對應出來了增刪改3條binlog,接着這三條binlog發送到MQ里面,到消費出來依次執行,起碼得保證人家是按照順序來的吧?不然本來是:增加、修改、刪除;你楞是換了順序給執行成刪除、修改、增加,不全錯了么。

本來這個數據同步過來,應該最后這個數據被刪除了;結果你搞錯了這個順序,最后這個數據保留下來了,數據同步就出錯了。

先看看順序會錯亂的倆場景

(1)rabbitmq:一個queue,多個consumer,這不明顯亂了

 

(2)kafka:一個topic,一個partition,一個consumer,內部多線程,這不也明顯亂了

 

 

那如何保證消息的順序性呢?簡單簡單

(1)rabbitmq:拆分多個queue,每個queue一個consumer,就是多一些queue而已,確實是麻煩點;或者就一個queue但是對應一個consumer,然后這個consumer內部用內存隊列做排隊,然后分發給底層不同的worker來處理

 

 

(2)kafka:一個topic,一個partition,一個consumer,內部單線程消費,寫N個內存queue,然后N個線程分別消費一個內存queue即可

 

 

 

如何解決消息隊列的延時以及過期失效問題?消息隊列滿了以后該怎么處理?有幾百萬消息持續積壓幾小時,說說怎么解決?

2、面試官心里分析

你看這問法,其實本質針對的場景,都是說,可能你的消費端出了問題,不消費了,或者消費的極其極其慢。接着就坑爹了,可能你的消息隊列集群的磁盤都快寫滿了,都沒人消費,這個時候怎么辦?或者是整個這就積壓了幾個小時,你這個時候怎么辦?或者是你積壓的時間太長了,導致比如rabbitmq設置了消息過期時間后就沒了怎么辦?

所以就這事兒,其實線上挺常見的,一般不出,一出就是大case,一般常見於,舉個例子,消費端每次消費之后要寫mysql,結果mysql掛了,消費端hang那兒了,不動了。或者是消費端出了個什么叉子,導致消費速度極其慢。

3、面試題分析

關於這個事兒,我們一個一個來梳理吧,先假設一個場景,我們現在消費端出故障了,然后大量消息在mq里積壓,現在事故了,慌了

(1)大量消息在mq里積壓了幾個小時了還沒解決

幾千萬條數據在MQ里積壓了七八個小時,從下午4點多,積壓到了晚上很晚,10點多,11點多

這個是我們真實遇到過的一個場景,確實是線上故障了,這個時候要不然就是修復consumer的問題,讓他恢復消費速度,然后傻傻的等待幾個小時消費完畢。這個肯定不能在面試的時候說吧。

一個消費者一秒是1000條,一秒3個消費者是3000條,一分鍾是18萬條,1000多萬條

所以如果你積壓了幾百萬到上千萬的數據,即使消費者恢復了,也需要大概1小時的時間才能恢復過來

一般這個時候,只能操作臨時緊急擴容了,具體操作步驟和思路如下:

1)先修復consumer的問題,確保其恢復消費速度,然后將現有cnosumer都停掉

2)新建一個topic,partition是原來的10倍,臨時建立好原先10倍或者20倍的queue數量

3)然后寫一個臨時的分發數據的consumer程序,這個程序部署上去消費積壓的數據,消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的10倍數量的queue

4)接着臨時征用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的數據

5)這種做法相當於是臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數據

6)等快速消費完積壓數據之后,得恢復原先部署架構,重新用原先的consumer機器來消費消息

(2)這里我們假設再來第二個坑

假設你用的是rabbitmq,rabbitmq是可以設置過期時間的,就是TTL,如果消息在queue中積壓超過一定的時間就會被rabbitmq給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在mq里,而是大量的數據會直接搞丟。

這個情況下,就不是說要增加consumer消費積壓的消息,因為實際上沒啥積壓,而是丟了大量的消息。我們可以采取一個方案,就是批量重導,這個我們之前線上也有類似的場景干過。就是大量積壓的時候,我們當時就直接丟棄數據了,然后等過了高峰期以后,比如大家一起喝咖啡熬夜到晚上12點以后,用戶都睡覺了。

這個時候我們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,然后重新灌入mq里面去,把白天丟的數據給他補回來。也只能是這樣了。

假設1萬個訂單積壓在mq里面,沒有處理,其中1000個訂單都丟了,你只能手動寫程序把那1000個訂單給查出來,手動發到mq里去再補一次

(3)然后我們再來假設第三個坑

如果走的方式是消息積壓在mq里,那么如果你很長時間都沒處理掉,此時導致mq都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。然后走第二個方案,到了晚上再補數據吧。

 

 

 

 

如果讓你寫一個消息隊列,該如何進行架構設計啊?說一下你的思路

2、面試官心里分析

其實聊到這個問題,一般面試官要考察兩塊:

(1)你有沒有對某一個消息隊列做過較為深入的原理的了解,或者從整體了解把握住一個mq的架構原理

(2)看看你的設計能力,給你一個常見的系統,就是消息隊列系統,看看你能不能從全局把握一下整體架構設計,給出一些關鍵點出來

說實話,我一般面類似問題的時候,大部分人基本都會蒙,因為平時從來沒有思考過類似的問題,大多數人就是平時埋頭用,從來不去思考背后的一些東西。類似的問題,我經常問的還有,如果讓你來設計一個spring框架你會怎么做?如果讓你來設計一個dubbo框架你會怎么做?如果讓你來設計一個mybatis框架你會怎么做?

3、面試題剖析

其實回答這類問題,說白了,起碼不求你看過那技術的源碼,起碼你大概知道那個技術的基本原理,核心組成部分,基本架構構成,然后參照一些開源的技術把一個系統設計出來的思路說一下就好

比如說這個消息隊列系統,我們來從以下幾個角度來考慮一下

(1)首先這個mq得支持可伸縮性吧,就是需要的時候快速擴容,就可以增加吞吐量和容量,那怎么搞?設計個分布式的系統唄,參照一下kafka的設計理念,broker -> topic -> partition,每個partition放一個機器,就存一部分數據。如果現在資源不夠了,簡單啊,給topic增加partition,然后做數據遷移,增加機器,不就可以存放更多數據,提供更高的吞吐量了?

(2)其次你得考慮一下這個mq的數據要不要落地磁盤吧?那肯定要了,落磁盤,才能保證別進程掛了數據就丟了。那落磁盤的時候怎么落啊?順序寫,這樣就沒有磁盤隨機讀寫的尋址開銷,磁盤順序讀寫的性能是很高的,這就是kafka的思路。

(3)其次你考慮一下你的mq的可用性啊?這個事兒,具體參考我們之前可用性那個環節講解的kafka的高可用保障機制。多副本 -> leader & follower -> broker掛了重新選舉leader即可對外服務。

(4)能不能支持數據0丟失啊?可以的,參考我們之前說的那個kafka數據零丟失方案

其實一個mq肯定是很復雜的,面試官問你這個問題,其實是個開放題,他就是看看你有沒有從架構角度整體構思和設計的思維以及能力。確實這個問題可以刷掉一大批人,因為大部分人平時不思考這些東西。

 

消息什么情況下會丟失?配合mandatory參數或備份交換器來提高程序的健壯性
發送消息的交換器並沒有綁定任何隊列,消息將會丟失
交換器綁定了某個隊列,但是發送消息時的路由鍵無法與現存的隊列匹配


預估隊列的使用情況?
在后期運行過程中超過預定的閾值,可以根據實際情況對當前集群進行擴容或者將相應的隊列遷移到其他集群。
消費消息?
推模式,拉模式
保證消息的可靠性?
RabbitMQ 提供了消息確認機制( message acknowledgement)。 消費者在訂閱隊列時,可以指定 autoAck 參數,當 autoAck 等於 false 時, RabbitMQ 會等待消費者顯式地回復確認信號后才從內存(或者磁盤)中移去消息(實質上
是先打上刪除標記,之后再刪除)。當 autoAck 等於 true 時, RabbitMQ 會自動把發送出去的 消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正地消費到了這些消息。


在ack為false的情況下,消費者獲取消息遲遲沒有發送消費者確認消息的信號或者消費者斷開,怎么辦?
當 autoAck 參數置為 false,對於 RabbitMQ 服務端而言,隊列中的消息分成了兩個部分: 一部分是等待投遞給消費者的消息:一部分是己經投遞給消費者,但是還沒有收到消費者確認信號的消息。 如果 RabbitMQ 一直沒有收到消費者的確認信號,並且消費此消息的消費者己經 斷開連接,則 RabbitMQ 會安排該消息重新進入隊列,等待投遞給下一個消費者,當然也有可 能還是原來的那個消費者。RabbitMQ 不會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否己經斷開,這么設計的原因是 RabbitMQ 允許消費者 消費一條消息的時間可以很久很久。


在消費者接收到消息后,如果想明確拒絕當前的消息而不是確認,那么應該怎么做呢?
RabbitMQ 在 2.0.0 版本開始引入了 Basic .Reject 這個命令,消費者客戶端可以調用與其對 應的 channel.basicReject 方法來告訴 RabbitMQ 拒絕這個消息。

//Channel 類中的 basicReject 方法定義如下:
//其中 deliveryTag 可以看作消息的編號 ,它是一個 64 位的長整型值,最大值是 9223372036854775807。如果 //requeue 參數設置為 true,則 RabbitMQ 會重新將這條消息存入 隊列,以便可以發送給下一個訂閱的消費者;如果 //requeue 參數設置為 false,則 RabbitMQ 立即會把消息從隊列中移除,而不會把它發送給新的消費者。
void basicReject(long deliveryTag, boolean requeue) throws IOException

注意:Basic.Reject 命令一次只能拒絕一條消息 ,如果想要批量拒絕消息 ,則可以使用 Basic.Nack 這個命令

//消費者客戶端可以調用 channel.basicNack 方法來實現,方法定 義如下:
//其中 deliveryTag 和 requeue 的含義可以參考 basicReject 方法。 multiple 參數
//設置為 false 則表示拒絕編號為 deliveryT坷的這一條消息,這時候 basicNack 和 basicReject 方法一樣; //multiple 參數設置為 true 則表示拒絕 deliveryTag 編號之前所 有未被當前消費者確認的消息。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException
注意:
將 channel.basicReject 或者 channel.basicNack 中的 requeue 設直為 false,可以啟用”死信隊列”的功能。死信隊列可以通過檢測被拒絕或者未送達的消息來追蹤問題

請求RabbitMQ重新發送還未被確認的消息?
 //Basic.Recover 具備可重入隊列的特性
 Basic.RecoverOk basicRecover() throws IOException;
 Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
channel.basicRecover 方法用來請求 RabbitMQ 重新發送還未被確認的消息。 如果 requeue 參數設置為 true,則未被確認的消息會被重新加入到隊列中,這樣對於同一條消息 來說,可能會被分配給與之前不同的消費者。如果 requeue 參數設置為 false,那么同一條消 息會被分配給與之前相同的消費者。默認情況下,如果不設置 requeue 這個參數,相當於
channel.basicRecover(true) ,即 requeue 默認為 true
交換器無法根據自身的類型和路由鍵找到一個符合條件 的隊列
當 mandatory 參數設為 true 時,交換器無法根據自身的類型和路由鍵找到一個符合條件 的隊列,那么 RabbitMQ 會調用 Basic.Return 命令將消息返回給生產者。當 mandatory 參 數設置為 false 時,出現上述情形,則消息直接被丟棄

生產者如何獲取到沒有被正確路由到合適隊列的消息呢?
可以通過調用channel.addReturnListener來添加ReturnListener監聽器實現。RabbitMQ 會通過 Basic . Return 返回 “mandatory test” 這條消息,之后生產者客戶端通過 ReturnListener 監昕到了這個事 件,上面代碼的最后輸出應該是”Basic.Retum 返回的結果是: mandatory test”


mandatory和immediate參數的區別
mandatory 參數告訴服務器至少將該消息路由到一個隊列中, 否則將消息返 回給生產者。 immediate 參數告訴服務器, 如果該消息關聯的隊列上有消費者, 則立刻投遞: 如果所有匹配的隊列上都沒有消費者,則直接將消息返還給生產者, 不用將消息存入隊列而等 待消費者了。

未被路由到的消息應該怎么處理?
發送消息的時候設置mandatory參數,添加ReturnListener監聽器接收未被路由到的返回消息
采用備份交換器AE,可以將未被路由的消息存儲在RabbitMQ中,通過聲明交換器的時候添加AE參數實現,或者通過策略的方式實現,同時使用,前者優先級高,會覆蓋掉Policy的設置


備份交換器需要注意?
如果設置的備份交換器不存在,客戶端和RabbitMQ服務端都不會有異常出現,此時消息會丟失
如果備份交換器沒有綁定任何隊列,客戶端和RabbitMQ服務端都不會有異常出現,此時消息會丟失
如果備份交換器沒有任何匹配的隊列,客戶端和RabbitMQ服務端都不會有異常出現,此時消息會丟失
如果備份交換器和mandatory參數一起使用,那么mandatory參數無效


怎么為消息設置過期時間TTL?
通過隊列屬性設置,隊列中所有消息都有相同的過期時間,聲明隊列的時候在channel.queueDeclare加入TTL參數
對消息本身進行單獨設置,每條消息的TTL可以不同,在channel.basicPublish方法參數中設置
同時使用以上兩種方式設置過期時間,以較小的為准
消息在隊列中的生存時間一旦超過設置的TTL值,就變成死信,消費者無法再收到該消息(不是絕對的)
如果不設置 TTL.則表示此消息不會過期;如果將 TTL 設置為 0,則表示除非此時可以直接將消息投遞到消費者,否則該消息會被立即丟棄,這個特性可以部分替代 RabbitMQ 3.0 版本之前的 immediate 參數


對過期消息處理?
設置隊列 TTL 屬性的方法,一旦消息過期,就會從隊列中抹去,隊列中己過期的消息肯定在隊 列頭部, RabbitMQ 只要定期從隊頭開始掃描是否有過期的消息即可,
消息本身進行單獨設置,即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期是在即將投遞到消費者之前判定的。每條消息的過期時間不同,如果要刪除所有過期消息勢必要掃描整個隊列,所以不如等到此消息即將 被消費時再判定是否過期, 如果過期再進行刪除即可。


怎么設置隊列的過期時間?
通過 channel . queueDeclare 方法中的 x-expires 參數可以控制隊列被自動刪除前處 於未使用狀態的時間。未使用的意思是隊列上沒有任何的消費者,隊列也沒有被重新聲明,並且在過期時間段內也未調用過 Basic . Get 命令。
RabbitMQ 會確保在過期時間到達后將隊列刪除,但是不保障刪除的動作有多及時 。在 RabbitMQ 重啟后,持久化的隊列的過期時間會被重新計算。


什么是死信隊列?
DLX,全稱為 Dead-Letter-Exchange,可以稱之為死信交換器,也有人稱之為死信郵箱。當消息在一個隊列中變成死信 (dead message) 之后,它能被重新被發送到另一個交換器中,這個交換器就是 DLX,綁定 DLX 的隊列就稱之為死信隊列。
DLX 也是一個正常的交換器,和一般的交換器沒有區別,它能在任何的隊列上被指定, 實 際上就是設置某個隊列的屬性。當這個隊列中存在死信時 , RabbitMQ 就會自動地將這個消息重新發布到設置的 DLX 上去,進而被路由到另一個隊列,即死信隊列。


什么是延遲隊列?
​ 延遲隊列存儲的對象是對應的延遲消息,所謂“延遲消息”是指當消息被發送后,並不想讓消費者立刻拿到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費

延遲隊列應用場景?
訂單系統,用延遲隊列處理超時訂單
用戶希望通過手機遠程遙控家里的智能設備在指定的時間進行工作。這時候就可以將 用戶指令發送到延遲隊列,當指令設定的時間到了再將指令推送到智能設備。
持久化?

交換器的持久化
交換器的持久化是通過在聲明交換器時將 durable 參數置為 true 實現的,如果交換器不設置持久化,那么在 RabbitMQ 服務重啟之后,相關的交換器元數據會丟失, 不過消息不會丟失,只是不能將消息發送到這個交換器中了。對一個長期使用的交換器來說,建議將其置為持久化的。

隊列的持久化
隊列的持久化是通過在聲明隊列時將 durable 參數置為 true 實現的,如果隊列不設置持久化,那么在 RabbitMQ 服務重啟之后,相關隊列的元數據會丟失,此時數據也會丟失。

消息的持久化
通過將消息的投遞模式 (BasicProperties 中的 deliveryMode 屬性)設置為 2 即可實現消息的持久化。

在這段時間內 RabbitMQ 服務節點發生了岩機、重啟等異常情況,消息保存還沒來得及落盤,那么這些消息將RabbitMQ 實戰指南會丟失。這個問題怎么解決呢?
​ 可以引入 RabbitMQ 的鏡像隊列機制,相當於配置了副本,如果主節點 Cmaster) 在此特殊時間內掛掉,可以自動切換到從節點 Cslave ), 這樣有效地保證了高可用性

當消息的生產者將消息發送出去之后,消息到底有沒有正確地到達服務器呢?
通過事務機制實現,比較消耗性能
客戶端發送 Tx.Select. 將信道置為事務模式;
Broker 回復 Tx. Select-Ok. 確認己將信道置為事務模式:
在發送完消息之后,客戶端發送 Tx.Commit 提交事務;
Broker 回復 Tx. Commi t-Ok. 確認事務提交。
通過發送方確認機制實現

消費端對消息的處理?
過推模式或者拉模式的方 式來獲取井消費消息,當消費者處理完業務邏輯需要手動確認消息己被接收,這RabbitMQ才能把當前消息從隊列中標記清除
如果消費者由於某些原因無法處理當前接收到的消息, 可以通過 channel . basicNack 或者 channel . basicReject 來拒絕掉。
消費端存在的問題?

消息分發
同一個隊列擁有多個消費者,會采用輪詢的方式分發消息給消費者,若其中有的消費者任務重,有的消費者很快處理完消息,導致進程空閑,這樣對導致整體應用吞吐量下降,為了解決上面的問題,用到channel.basicQos 方法允許限制信道上的消費者所能保持的最大未確認消息的數量。Basic.Qos 的使用對於拉模式的消費方式無效.

舉例如下:
在訂閱消費隊列之前,消費端程序調用了 channel.basicQos(5) ,之后訂 閱了某個隊列進行消費。 RabbitMQ 會保存一個消費者的列表,每發送一條消息都會為對應的消費者計數,如果達到了所設定的上限,那么 RabbitMQ 就不會向這個消費者再發送任何消息。 直到消費者確認了某條消息之后 , RabbitMQ將相應的計數減1,之后消費者可以繼續接收消息, 直到再次到達計數上限。這種機制可以類比於 TCP!IP中的”滑動窗口”。

消息順序性
生產者使用了事務機制可能會破壞消息順序性
生產者發送消息設置了不同的超時時間,並且設置了死信隊列
消息設置了優先級
可以考慮在消息體內添加全局有序標識來實現

棄用QueueingConsumer,Spring提供的RabbitMQ采用的是DefaultConsume
內存溢出,由於某些原因,隊列之中堆積了比較多的消息,可能導致消費者客戶端內存溢出假死,發生惡性循環,使用 Basic . Qos 來解決,一定要在調用 Basic . Consume 之前調用 Basic.Qos
才能生效。
會拖累同一個connection下的所有信道,使其性能降低
同步遞歸調用QueueingConsumer會產生死鎖
RabbitMQ的自動連接恢復機制不支持QueueingConsumer這種形式
QueueingConsumer不是事件驅動的


消息傳輸保障?
一般消息中間件的消息傳輸保障分為三個等級
At most once: 最多一次。消息可能會丟失,但絕不會重復傳輸。
At least once: 最少一次。消息絕不會丟失,但可能會重復傳輸。
Exactly once: 恰好一次。每條消息肯定會被傳輸一次且僅傳輸一次。
RabbitMQ支持其中的“最多一次”和“最少一次”。
其中”最少一次”投遞實現需要考慮 以下這個幾個方面的內容:
消息生產者需要開啟事務機制或者 publisher confirm 機制,以確保消息可以可靠地傳 輸到 RabbitMQ 中。
消息生產者需要配合使用 mandatory 參數或者備份交換器來確保消息能夠從交換器 路由到隊列中,進而能夠保存下來而不會被丟棄。
消息和隊列都需要進行持久化處理,以確保 RabbitMQ 服務器在遇到異常情況時不會造成消息丟失。
消費者在消費消息的同時需要將 autoAck 設置為 false,然后通過手動確認的方式去 確認己經正確消費的消息,以避免在消費端引起不必要的消息丟失。
“最多一次”的方式就無須考慮以上那些方面,生產者隨意發送,消費者隨意消費,不過這 樣很難確保消息不會丟失。


提高數據可靠性途徑?
設置 mandatory 參數或者備份交換器 (immediate 參數己被陶汰);
設置 publisher conflITll機制或者事務;
設置交換器、隊列和消息都為持久化;
設置消費端對應的 autoAck 參數為 false 井在消費完消息之后再進行消息確認

 

rabbit面試題
1.什么是rabbitmq
采用AMQP高級消息隊列協議的一種消息隊列技術,最大的特點就是消費並不需要確保提供方存在,實現了服務之間的高度解耦

2.為什么要使用rabbitmq
1.在分布式系統下具備異步,削峰,負載均衡等一系列高級功能;
2.擁有持久化的機制,進程消息,隊列中的信息也可以保存下來。
3.實現消費者和生產者之間的解耦。
4.對於高並發場景下,利用消息隊列可以使得同步訪問變為串行訪問達到一定量的限流,利於數據庫的操作。
5.可以使用消息隊列達到異步下單的效果,排隊中,后台進行邏輯下單。

3.使用rabbitmq的場景
1.服務間異步通信
2.順序消費
3.定時任務
4.請求削峰

4.如何確保消息正確地發送至RabbitMQ? 如何確保消息接收方消費了消息?
發送方確認模式:
將信道設置成confirm模式(發送方確認模式),則所有在信道上發布的消息都會被指派一個唯一的ID。
一旦消息被投遞到目的隊列后,或者消息被寫入磁盤后(可持久化的消息),信道會發送一個確認給生產者(包含消息唯一ID)。
如果RabbitMQ發生內部錯誤從而導致消息丟失,會發送一條nack(not acknowledged,未確認)消息。
發送方確認模式是異步的,生產者應用程序在等待確認的同時,可以繼續發送消息。當確認消息到達生產者應用程序,生產者應用程序的回調方法就會被觸發來處理確認消息。
接收方確認機制
接收方消息確認機制:消費者接收每一條消息后都必須進行確認(消息接收和消息確認是兩個不同操作)。只有消費者確認了消息,RabbitMQ才能安全地把消息從隊列中刪除。
這里並沒有用到超時機制,RabbitMQ僅通過Consumer的連接中斷來確認是否需要重新發送消息。也就是說,只要連接不中斷,RabbitMQ給了Consumer足夠長的時間來處理消息。保證數據的最終一致性;
下面羅列幾種特殊情況:
如果消費者接收到消息,在確認之前斷開了連接或取消訂閱,RabbitMQ會認為消息沒有被分發,然后重新分發給下一個訂閱的消費者。(可能存在消息重復消費的隱患,需要去重)
如果消費者接收到消息卻沒有確認消息,連接也未斷開,則RabbitMQ認為該消費者繁忙,將不會給該消費者分發更多的消息。

5.如何避免消息重復投遞或重復消費?
在消息生產時,MQ內部針對每條生產者發送的消息生成一個inner-msg-id,作為去重的依據(消息投遞失敗並重傳),避免重復的消息進入隊列;
在消息消費時,要求消息體中必須要有一個bizId(對於同一業務全局唯一,如支付ID、訂單ID、帖子ID等)作為去重的依據,避免同一條消息被重復消費。
6.消息基於什么傳輸?
由於TCP連接的創建和銷毀開銷較大,且並發數受系統資源限制,會造成性能瓶頸。RabbitMQ使用信道的方式來傳輸數據。信道是建立在真實的TCP連接內的虛擬連接,且每條TCP連接上的信道數量沒有限制。

7.消息如何分發?
若該隊列至少有一個消費者訂閱,消息將以循環(round-robin)的方式發送給消費者。每條消息只會分發給一個訂閱的消費者(前提是消費者能夠正常處理消息並進行確認)。
通過路由可實現多消費的功能

8.消息怎么路由?
消息提供方->路由->一至多個隊列
消息發布到交換器時,消息將擁有一個路由鍵(routing key),在消息創建時設定。
通過隊列路由鍵,可以把隊列綁定到交換器上。
消息到達交換器后,RabbitMQ會將消息的路由鍵與隊列的路由鍵進行匹配(針對不同的交換器有不同的路由規則);
常用的交換器主要分為一下三種:
fanout:如果交換器收到消息,將會廣播到所有綁定的隊列上
direct:如果路由鍵完全匹配,消息就被投遞到相應的隊列
topic:可以使來自不同源頭的消息能夠到達同一個隊列。 使用topic交換器時,可以使用通配符

9.如何確保消息不丟失?
消息持久化,當然前提是隊列必須持久化
RabbitMQ確保持久性消息能從服務器重啟中恢復的方式是,將它們寫入磁盤上的一個持久化日志文件,當發布一條持久性消息到持久交換器上時,Rabbit會在消息提交到日志文件后才發送響應。
一旦消費者從持久隊列中消費了一條持久化消息,RabbitMQ會在持久化日志中把這條消息標記為等待垃圾收集。如果持久化消息在被消費之前RabbitMQ重啟,那么Rabbit會自動重建交換器和隊列(以及綁定),並重新發布持久化日志文件中的消息到合適的隊列。

10.使用RabbitMQ有什么好處?
服務間高度解耦,
異步通信性能高,
流量削峰


11.rabbitmq的集群
鏡像集群模式
你創建的queue,無論元數據還是queue里的消息都會存在於多個實例上,然后每次你寫消息到queue的時候,都會自動把消息到多個實例的queue里進行消息同步。
好處在於,你任何一個機器宕機了,沒事兒,別的機器都可以用。壞處在於,第一,這個性能開銷也太大了吧,消息同步所有機器,導致網絡帶寬壓力和消耗很重!第二,這么玩兒,就沒有擴展性可言了,如果某個queue負載很重,你加機器,新增的機器也包含了這個queue的所有數據,並沒有辦法線性擴展你的queue

12.mq的缺點
系統可用性降低
系統引入的外部依賴越多,越容易掛掉,本來你就是A系統調用BCD三個系統的接口就好了,人ABCD四個系統好好的,沒啥問題,你偏加個MQ進來,萬一MQ掛了咋整?MQ掛了,整套系統崩潰了,你不就完了么。
系統復雜性提高:
硬生生加個MQ進來,你怎么保證消息沒有重復消費?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性?頭大頭大,問題一大堆,痛苦不已
一致性問題:
A系統處理完了直接返回成功了,人都以為你這個請求就成功了;但是問題是,要是BCD三個系統那里,BD兩個系統寫庫成功了,結果C系統寫庫失敗了,咋整?你這數據就不一致了。
所以消息隊列實際是一種非常復雜的架構,你引入它有很多好處,但是也得針對它帶來的壞處做各種額外的技術方案和架構來規避掉,最好之后,你會發現,媽呀,系統復雜度提升了一個數量級,也許是復雜了10倍。但是關鍵時刻,用,還是得用的。。。

 

RabbitMQ 在上一家公司已經接觸過了, 但是懵懵懂懂的. 不是很清楚. 具體怎么個邏輯.
這次公司打算搭建新的系統. 領導要求研究一下MQ.
經過研究得出的結論是. MSMQ的設計理念不適合做系統的底層框架. 他不適合做分布式系統. 最主要的是. MSMQ如果沒有消費者, 默認消息是一直存在的.
而RabbitMQ的設計理念是.只要有接收消息的隊列. 郵件就會存放到隊列里. 直到訂閱人取走. . 如果沒有可以接收這個消息的消息隊列. 默認是拋棄這個消息的..
下面就把我的研究結果寫一下.
如何在新的系統中使用RabbitMQ.
系統設計的兩個重大問題.
第一條要滿足未來的業務需求的不斷變化和增加. 也就是可擴展性.
第二條要滿足性能的可伸縮性. 也就是可集群性…通過增加機器能處理更多的請求
第三條要解耦合.
如果不解耦合, 未來業務增加或變更的時候你還在修改3年前寫的代碼.試問你有多大的把握保證升級好系統不出問題? 如何可以寫新的代碼而不用修改老代碼所帶來的好處誰都知道…
第四條簡單易懂.
以上4條在任何一個系統中都要遵循的原則. 以前是無法做到的. 自從有了MQ以后. 這些都可以同時做到了.
以前的設計理念是把系統看作一個人,按照工作的指令從上到下的執行.
現在要建立的概念是, 把系統的各個功能看作不同的人. 人與人之間的溝通通過消息進行交流傳遞信息…
有了MQ以后把一個人的事情分給了不同的人, 分工合作所帶來的好處是專業化, 並行化. 當然也引入了一些麻煩,性能開銷多一些, 工作任務的完整性不能立即得到反饋.幸好我們可以通過最終一致性.來解決這個麻煩的問題…
下面進入正題.
第一個問題RabbitMQ是如何支持可擴展性的.

如上圖, 寄件人P是系統的一個功能模塊. 用來發送消息. 一般是在某些重要的業務狀態變更時發送消息. 例如: 新訂單產生時, 訂單已打包時, 訂單已出庫時, 訂單已發出時.
那么當事件 新訂單產生時, 我們需要把這個信息告訴誰呢? 給財務? 還是給倉庫發貨?
這個地方最大的重點是. 當事件產生時. 根本不關心. 該投遞給誰.
我只要把我的重要的信息投到這個亂七八糟的MQ系統即可. 其它人你該干嘛干嘛. 反正我的任務完成了. (有沒有甩手掌櫃的感覺..)
我只要告訴系統,我的事件屬於那一類.
例如: “某某省.某某市.某某公司.產生新訂單”
那么這個地址就屬於 投遞地址.. 至於這個地址具體投到哪個郵箱那是郵局的事情.
當然還有一些具體的訂單內容也屬於要告訴系統的內容.
那么下一個問題來了, 郵局怎么知道 你的這個消息應該投遞給誰?
參考我們現實世界中的郵寄系統.是默認的省市縣這么投遞的. 這是固定思維.
但是我們的MQ系統中不是這樣的. 是先有收件人的郵箱. (隊列Queue). MQ才能投遞. 否則就丟棄這個信息…
所以MQ系統應該先有收件人的郵箱 Queue 也就是隊列. 才能接收到信息.
再有郵局
再有發信息的人.
RabbitMQ能實現系統擴展的一個重要功能在於, 可以兩個郵箱收同一個地址的信.
翻譯成專業的話 RabbitMQ 可以 兩個隊列Queue訂閱同一個RoutingKey的信息..
RabbitMQ在投遞的時候,會把一份信息,投遞到多個隊列郵箱中Queue…
這是系統可擴展性的基礎.
第二個問題RabbitMQ如何滿足性能的可伸縮性. 也就是可集群性
先上圖

從上圖, 可以看到. 性能擴展的關鍵點就在於 訂閱人C1, 訂閱人C2 輪流收到郵箱隊列里面的信息, 訂閱人C1和訂閱人C2收到的信息內容不同, 但都屬於同一類….
所以. 訂閱人C1和訂閱人C2是干同一種工作的客戶端.用來提高處理能力.
上面說完了,如何使用. 下面再分析一下幾個關注點.
如果訂閱人的down機了. 信息會丟失嗎?
    事實上是不會的. 只要有郵箱(隊列Queue)存在.信息就一直存在, 除非訂閱人去取走.
如果訂閱人一直down機, 郵箱隊列能存多少信息?會不會爆掉?
   理論上和實際上都是有上限的不可能無限多. 具體多少看硬盤吧..我沒測到過上限.
我這篇文章並不打算講解郵局的4種投遞模式. 有其它文章講的很好. 我只打算使用topic這種模式. 因為它更靈活一些.
再說一下我的另外兩個觀點.
不要在業務程序中用代碼定義創建 郵局 ExChange. 和郵箱Queue隊列 這屬於系統設計者要構架的事情. 要有專門獨立的程序和規則去創建. 這樣可以統一管理事件類型.避免過多的亂七八糟的RoutingKey混亂.
我的理解認為
消息系統的分布式可擴展的實現在於消息廣播, 集群性的實現在於郵箱隊列.
RabbitMQ是先廣播后隊列的.
Exchange: 就是郵局的概念等同於 中國郵政和順豐快遞、
routingkey: 就是郵件地址的概念.
queue: 就是郵箱接收軟件,但是可以接收多個地址的郵件,通過bind實現。
producer: 消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
其它關於投遞模式, 請參考下面的兩篇文章.

1  RabbitMQ數據速率問題

在邊讀邊寫的情況下:速率只與網絡帶寬正相關,網絡使用率最高能達到接近100%,並且數據使用率很高(90%以上)。

        在千兆網下,以500KB一條數據為例,讀寫速率均能達到200條/s,約為100MB/s。

在只寫不讀的情況下:寫入速率瓶頸在於硬盤寫入速度。

 

2  RabbitMQ數據存儲路徑變更到D盤方法

Windows環境下,在安裝前設置環境變量:RABBITMQ_BASE=D:\RabbitMQ_Data

 

3  RabbitMQ磁盤寫滿重啟后數據丟失問題

表現:磁盤寫滿后發送、讀取程序均不能連接服務。

解決方法:將Queue、Exchange設置為Durable即不會發生數據丟失問題。

通過a.關閉服務;b.刪除占位文件、erl_crash.dump;c.重啟服務 三步操作后,磁盤會清理出10M左右空間,此時讀取數據程序便可正常工作。

正確設計的架構,應確保RabbitMQ不會發生磁盤寫滿崩潰的情況。

 

4  RabbitMQ集群

在網絡帶寬占滿的情況下,通過集群的方式解決吞吐量不足的問題需要多台效果才明顯。

假設外設吞吐率為d條/s,外設向RabbitMQ1發送的概率為r1,向RabbitMQ2發送的概率為r2,RabbitMQ1需要向RabbitMQ2轉發的概率為r3,RabbitMQ2需要向RabbitMQ1轉發的概率為r3。那么RabbitMQ1進入的吞吐率為:(r1*d + r4*r2*d) 條/s ≈ 3d/4條/s,RabbitMQ2進入的吞吐率為:(r2*d + r3*r1*d) 條/s ≈ 3d/4條/s;這樣的確比只使用一台RabbitMQ的吞吐率d條/s要求低些。

N台RabbitMQ的集群,每台的平均吞吐率為:(2N-1)d/(N*N) 條/s;N=3時,平均吞吐率為5d/9條/s;N=4時,平均吞吐率為7d/16條/s。

解決方法:多台RabbitMQ服務器提供服務,在客戶端以輪循方式訪問服務,若1台down掉則不使用此台的隊列服務,服務器之間沒有聯系,這樣N台RabbitMQ的平均吞吐率為:1d/N 條/s。具體實現可以,專寫一個用戶收發RabbitMQ消息的jar/dll,在配置文件里填寫RabbitMQ機器地址,使用輪循詢問、收發的方式,提供給應用程序以黑盒方式調用。下面提供了java版本的收發實現。

發送端sender.xml配置:

<!-- 處理器相關 -->
    <bean id="sender" class="demo.Sender">
        <property name="templates">
            <list>
                <ref bean="template1" />
            <!--     <ref bean="template2" /> -->
            </list>
        </property>
    </bean>
 
    <bean id="timeFlicker" class="demo.TimeFlicker">
        <property name="handlers">
            <list>
                <ref bean="sender" />
            </list>
        </property>
    </bean>
    <!-- 處理器相關 -->
 
    <!-- amqp配置 相關 -->
    <rabbit:connection-factory id="connectionFactory1"
        host="192.1.11.108" username="guest" password="guest" virtual-host="/" />
    <rabbit:connection-factory id="connectionFactory2"
        host="192.1.11.172" username="guest" password="guest" virtual-host="/" />
    <!-- amqp配置 相關 -->
 
    <!-- 發送相關 -->
    <rabbit:template id="template1" connection-factory="connectionFactory1"
        exchange="exchange" />
 
    <rabbit:template id="template2" connection-factory="connectionFactory2"
        exchange="exchange" />
    <!-- 發送相關 -->

 

                  說明:這里配置了兩個RabbitMQ服務器,timeFlicker的目的是過一段時間把不能服務的RabbitMQ服務器重新添加到列表中,重試發送。

                  接收端receiver.xml配置:

<!-- amqp配置 相關 -->
    <rabbit:connection-factory id="connectionFactory1"
        host="192.1.11.108" username="guest" password="guest" virtual-host="/" />
    <rabbit:connection-factory id="connectionFactory2"
        host="192.1.11.172" username="guest" password="guest" virtual-host="/" />
    <!-- amqp配置 相關 -->
 
    <!-- 監聽相關 -->
    <bean id="Recv1" class="demo.Recv1" />
    <rabbit:listener-container id="Listener1"
        connection-factory="connectionFactory1" prefetch="1" acknowledge="auto">
        <rabbit:listener ref="Recv1" method="listen"
            queue-names="queue1" />
    </rabbit:listener-container>
    <bean id="Recv2" class="demo.Recv2" />
    <rabbit:listener-container id="Listener"
        connection-factory="connectionFactory1" prefetch="1" acknowledge="auto">
        <rabbit:listener ref="Recv2" method="listen"
            queue-names="queue2" />
    </rabbit:listener-container>
    <!-- 監聽相關 -->

 

                 說明:這里監聽了兩個RabbitMQ服務器,此處不需要timeFlicker。

                 如需具體代碼可以聯系本人 。

我認為MQ丟數據的問題,主要是同步還是異步刷盤、斷電是否導致的。只要send反饋正確,確保發送被接收,receive時有反饋后才會刪除數據;同步刷盤,或異步刷盤不斷電的,就不會丟失消息,

程序對於發送反饋異常的,要記錄;MQ對於receive無反饋的,有重發機制,可能會有一條數據發送多次的情況,要在程序中剔除。

 

參考:

https://blog.csdn.net/HiBoyljw/article/details/85123099
https://blog.csdn.net/zlt995768025/article/details/81938449  
https://blog.csdn.net/qq_42629110/article/details/84965084  
https://blog.csdn.net/phker/article/details/71211895  
http://www.cnblogs.com/wgp13x/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM