參考:https://blog.csdn.net/youbl/article/details/80425959
0、防止消息丟失
生產者:confire模式。異步等待MQ回調通知是否接收到消息,判斷是否重發。
MQ:持久化。設置Queue持久化 + Msg持久化deliveryMode=2
消費者:手動ACK。注意:超時、死循環、Qos、冪等
1、自動ack機制會導致消息丟失的問題
自動ack消費者接到消息立即ack。將自動ack改為手動ack
2、啟用手動ack機制后,沒有及時ack導致的隊列異常(Unacked過多)
模式:成功ack,異常失敗無操作。
為解決問題1,處理消息完成后再做ack響應,失敗什么都不做,這樣消息會儲存在MQ的Unacked消息里不會丟失。
但是如果RabbitMQ沒有得到ack響應,這些消息會堆積在Unacked消息里,不會拋棄。
而且若Consumer不斷開連接,這些Unacked消息也不會變回ready的消息,不能重新推送。
3 啟用nack+重入隊 機制后,導致的死循環(Ready過多)
模式:成功ack,異常失敗nack+ requeue=true
為解決問題2,在處理消息異常時nack。正常就ack,失敗就nack消息重新入隊首,並等下一次重新消費。
但是新問題現象是Ready的消息猛增,一直不見減少。原因是出異常后,把消息塞回隊列頭部,下一步又消費這條會出異常的 消息,又出錯塞回隊列頭部……進入了死循環了,新來的消息不會被消費,導致堆積了
可以考慮通過給隊列綁定死信交換機DLX(basic.reject/basic.nack 且 requeue=false 消息進入綁定的DLX),保存並處理異常或多次重試的信息。
https://blog.csdn.net/shanchahua123456/article/details/84324059
方案1:
避免反復重試的方法,在消費消息時先在redis中記入 消息ID:count +TTL,下次再消費時判斷redis中次數,超次則不重入隊,記錄失敗消息內容或轉發其他MQ或進入DLX。這樣保證在一定時間內同一消息不會反復消費失敗,避免死循環阻塞,且避免消息丟失等待補償機制。補償機制也可以參考redis中 消息ID:count 判斷是否再次補償
4、啟用Qos和ack機制后,沒有及時ack導致的隊列堵塞;
// 啟用QoS,每次預取5條消息,避免消息處理不過來,全部堆積在本地緩存里
channel.BasicQos(0, 5, false);
開啟QoS,當RabbitMQ的隊列達到5條Unacked消息時,不會再推送消息給Consumer;
參考上邊解決沒有及時ack
5 消費者要注意冪等性設計,以防重復消費
出現重發的情況
1.代碼邏輯/補償機制等情況可能出現消息重發。
2.消費者斷開連接時Unacked的消息會再次入隊,但是重入隊的消息可能已經被消費 。
3.nack重入隊
首要前提消息要有唯一標示
a.根據ID判斷是否重復消費(可以用Redis+lua保證原子性的判斷更新)
b.多次重復執行,結果一致
c.利用數據庫主鍵/唯一鍵
d.update+where 條件樂觀鎖執行
6 提高消費者效率
增加消費者單機監聽線程數,直接增加消費者進程數
7 消息堆積處理
7.1 線上消費者出現BUG,或消費邏輯過慢
一般手動AKC+重入隊會造成消息堆積,自動ACK雖然不會堆積,但是可能造成消息丟失。
方法一:若時間來得及可以修復消費者BUG,並部署更多的消費者一起消費快速將堆積的消息落庫。
方法二:若來不及修復,此時可以部署幾台新消費者,其工作只從隊列里讀取數據,不執行業務邏輯直接把消息轉存到新的MQ中(或緩存中),使原業務MQ恢復暢通避免MQ被撐爆。再部署幾倍量的無bug的消費者去消費新隊列中的消息,快速將堆積的消息落庫
注意:
1 為消息設置TTL時,最好綁定死信隊列,避免消息超時丟失。
2 有序性問題
3 自動ACK丟失補償問題
7.2 避免觸發流控機制,導致不接收生產者消息。
服務端默認配置是當內存使用達到40%,磁盤空閑空間小於50M,即啟動內存報警,磁盤報警;報警后服務端觸發流控(flow control)機制。一般地,當發布端發送消息速度快於訂閱端消費消息的速度時,隊列中堆積了大量的消息,導致報警,就會觸發流控機制。
如下邊流程圖所示,發送消息后MQ首先確認消息堆積情況。MQ會持續從隊列中取出堆積的消息將其發送出去,直到觸發圖中分支能返回接收新信息。否則MQ就會持續的發送堆積消息,不去處理新來的消息,在流控機制的作用下,發送端就被阻塞了
7.3避免觸發流控應對措施:
- 加消費機器,加消費線程。
- 打破發送循環條件。如上圖中返回接收新消息的分支。
- 設置合適的qos值,當qos值被用光,而新的ack未被mq接收時,就可以跳出發送循環,去接收新的消息。
- 消息者到主動block接收進程,消費者感知到接收消息的速度過快時,主動block,利用block與unblock方法調節接收速率。當接收進程被block時,mq跳出發送循環。
- 建立新的隊列:若服務器cpu資源有較多剩余,而又不需要保證消息的順序的情況下可以通過建立新的vhost,在該vhost下創建queue,生產者將消息發送掉新的queue,消費者同時訂閱新舊queue。
- 使用緩存:在生產者端使用緩存,當生產速率受到流控限制時,緩存數據。在堆積的消息被處理完后,生產速率恢復正常時,此時將緩存的數據發送給MQ。
- 更新rabbitmq版本,在新版2.8.4中,在有大量消息堆積時,生產速率會受到抑制,但生產者不會完全被阻塞。
8 任務間相互依賴和有序性
相互依賴:
相互依賴的任務一定要注意入隊順序,或是分配到不同隊列中防止阻塞。
例如:B依賴A先執行,但是A在B之后入隊。
有序性:
比如同步數據庫操作binlog順序不能錯。或update狀態時先后順序不能亂。
一個queue多個消費者時就可能出現執行順序混亂。
解決:保證queue一對一消費。若想要提高消費能力可以考慮設計規則拆分隊列,但是每個隊列只有一個消費者。
9 消息持久化
注意隊列持久化與消息持久化不同,隊列持久化是指保存隊列信息,重啟后自動創建和綁定。但是只有隊列持久化是無法恢復消息的。需要設置消息持久化。
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
RabbitMQ在兩種情況下會將消息寫入磁盤:
消息本身在publish發布的時候就要求消息寫入磁盤;
內存緊張,需要將部分內存中的消息轉移到磁盤;
寫入文件前會有一個Buffer,大小為1M(1048576),數據在寫入文件時,首先會寫入到這個Buffer,如果Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤); 有個固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每隔25ms,Buffer里的數據及未刷新到磁盤的文件內容必定會刷到磁盤; 每次消息寫入后,如果沒有后續寫入請求,則會直接將已寫入的消息刷到磁盤。
10 高可用
普通集群模式:只是增加吞吐量,不保證高可用,不推薦
鏡像集群模式:保證高可用,queue內容會拷貝到各節點服務器中。支持任一節點生產+消費。
缺點:非分布式,每個節點都有全部拷貝。
可在控制台創建鏡像同步策略,創建queue時綁定策略
https://blog.csdn.net/winy_lm/article/details/81128181#t2
mirror queue內部有一套選舉算法,會選出一個master,和若干個slaver。master和slaver 通過相互間不斷發送心跳來檢查是否連接斷開。可以通過指定net_ticktime來控制心跳檢查頻率。
consumer,任意連接一個節點,若連上的不是master,請求會轉發給master,為了保證消息的可靠性,consumer回復ack給master后,master刪除消息並廣播所有的slaver去刪除。
publisher ,任意連接一個節點,若連上的不是master,則轉發給master,由master存儲並轉發給其他的slaver存儲。
如果master掛掉,則從slaver中選擇消息隊列最長的為master,在這種情況下可以存在消息未同步給ack消息未同步的情況,會造成消息重發(默認是異步同步的)。總共有以下幾件事情發生:
1)1個最老的(隊列最長的)的slaver提升為master,如果沒有一個slaver是和master同步的則會造成消息丟失。
2) 要提升為master的slaver會認為以前所有連接掛掉的master的消費者都斷開了連接。那么存在clinet發送了ack的消息單還在路上是master掛掉的情況,或者master收到了ack但是在廣播給slaver的時候master掛掉的情況,所以新的master別無選擇,只能認為消息沒有被確認。他會requeue他認為沒有ack的消息。那么client可能就收到了重復的消息,並要再次發送ack。
3)從鏡像隊列中消費的client支持了consumer Cancellation通知的,將收到通知並訂閱的mirrored-queue被取消了,這是因為該mirrored-queue 升級成了master,這是client需要重現去找mirrored-queue上消費,這樣就避免了client繼續發送ack到老的掛掉的master上。避免收到新的master發送的相同的消息。
4)如果noAck=true,且在mirrored-queue上消費,那么在切換時由於服務器是先ack然后發送到noAck=true的消費者,這時連接斷開可能導致該數據丟失
如果slaver掛掉,則集群的節點狀態沒有任何變化。只要client沒有連到這個節點上,也不會給client發送失敗的通知。在檢測到slaver掛掉的期間publish消息會有延遲。如果配置了高可用策略是自動同步,當slaver起來后,隊列中有大量的消息需要同步,將會整個集群阻塞長時間的不能讀寫直到同步結束。
這兩個掛掉的情況都需要客戶端鏡像容錯,比如在連接斷開的時候進行重連(官方的Java和.net 客戶端提供了callback方法在監聽到鏈接失敗的時候調用。Java在Connection和channel類中提供了ShutdownListener 的callback方法,.net client在IConnecton中提供了ConnectionShuedown在Imodel中提供了ImodelShutdown事件供調用) 。也可以在client和server之間加入LoadBalancer.比如haproxy做負載均衡。
11 RabbitMQ的用戶角色分類
none、management、policymaker、monitoring、administrator
none
不能訪問 management plugin
management
用戶可以通過AMQP做的任何事外加:
列出自己可以通過AMQP登入的virtual hosts
查看自己的virtual hosts中的queues, exchanges 和 bindings
查看和關閉自己的channels 和 connections
查看有關自己的virtual hosts的“全局”的統計信息,包含其他用戶在這些virtual hosts中的活動。
policymaker
management可以做的任何事外加:
查看、創建和刪除自己的virtual hosts所屬的policies和parameters
monitoring
management可以做的任何事外加:
列出所有virtual hosts,包括他們不能登錄的virtual hosts
查看其他用戶的connections和channels
查看節點級別的數據如clustering和memory使用情況
查看真正的關於所有virtual hosts的全局的統計信息
administrator
policymaker和monitoring可以做的任何事外加:
創建和刪除virtual hosts
查看、創建和刪除users
查看創建和刪除permissions
關閉其他用戶的connections
12 控制台使用
a.查看隊列中的消息,進入對應隊列的詳情頁,選擇get messages。可以設置ack模式,編碼格式,查看消息量。
如果只想查看請選擇NACK模式。
13 創建channel過多異常
默認一個RabbitTemplate在RabbitMQ中相當於一個connection,每發送一次消息相當於channel,MQ確認收到消息后釋放channel。每個connection最多支持2048個channel,從一個connection同時超過2048個線程並發發送,channel超過2048,會報異常org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。
后台管理頁面查看connection+channel
10個線程並發發送10個消息,監控到10個channel生成,接收完成后釋放channel。如果發送方是publisher-confirms模式,channel會保持到confirm回調完成再釋放,影響並發性能。每個connection最多支持2048個channel。
測試啟動publisher-confirms后,500個線程並發發送,部分消息報AmqpResourceNotAvailableException。400個線程通過一個RabbitTemplate並發發送10000消息,最高同時就可能產生1000多的channel。因為channel在等待執行confirm回調。10000消息全部發送在幾秒內完成,10000消息全部confirm回調完成用時22秒,此時所有channel全部釋放。