中間件選型
為什么要使用消息隊列?
異步處理 - 相比於傳統的串行、並行方式,提高了系統吞吐量。
應用解耦 - 系統間通過消息通信,不用關心其他系統的處理。
流量削鋒 - 可以通過消息隊列長度控制請求量;可以緩解短時間內的高並發請求。
日志處理 - 解決大量日志傳輸。
消息通訊 - 消息隊列一般都內置了高效的通信機制,因此也可以用在純的消息通訊。比如實現點對點消息隊列,或者聊天室等。
消息隊列有什么優缺點?
優點:解耦、異步、削峰
解耦:
當多個系統或者模塊,互相之間的調用很復雜,維護起來很麻煩。如果這些調用是不需要直接同步調用接口的,可以用 MQ 給它異步化解耦。
A 系統產生一條數據,發送到 MQ 里面去,哪個系統需要數據自己去 MQ 里面消費。如果新系統需要數據,直接從 MQ 里消費即可;如果某個系統不需要這條數據了,就取消對 MQ 消息的消費即可。這樣下來,A 系統壓根兒不需要去考慮要給誰發送數據,不需要維護這個代碼,也不需要考慮人家是否調用成功、失敗超時等情況。
異步:
A 系統接收一個請求,需要在自己本地寫庫,還需要在 BCD 三個系統寫庫,自己本地寫庫要 3ms,BCD 三個系統分別寫庫要 300ms、450ms、200ms。最終請求總延時是 3 + 300 + 450 + 200 = 953ms,接近 1s,用戶感覺搞個什么東西,慢死了慢死了。用戶通過瀏覽器發起請求。如果使用 MQ,那么 A 系統連續發送 3 條消息到 MQ 隊列中,假如耗時 5ms,A 系統從接受一個請求到返回響應給用戶,總時長是 3 + 5 = 8ms。
削峰:
減少高峰時期對服務器壓力。
缺點:系統可用性降低、系統復雜度提高(如何保證消息不被重復消費、如何保證消息可靠性傳輸等)、數據一致性問題
使用場景有哪些?
異步通信:有些業務不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
解耦:降低工程間的強依賴程度,針對異構系統進行適配。在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。通過消息系統在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口,當應用發生變化時,可以獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
冗余:有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的”插入-獲取-刪除”范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
擴展性:因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。便於分布式擴容。
過載保護:在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量無法提取預知;如果以為了能處理這類瞬間峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
可恢復性:系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
順序保證:在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。
緩沖:在任何重要的系統中,都會有需要不同的處理時間的元素。消息隊列通過一個緩沖層來幫助任務最高效率的執行,該緩沖有助於控制和優化數據流經過系統的速度。以調節系統響應時間。
數據流處理:分布式系統產生的海量數據流,如:業務日志、監控數據、用戶行為等,針對這些數據流進行實時或批量采集匯總,然后進行大數據分析是當前互聯網的必備技術,通過消息隊列完成此類數據收集是最好的選擇。
消息中間件常用協議
AMQP協議:AMQP即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不同產品,不同開發語言等條件的限制。
優點:可靠、通用
MQTT協議:MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。該協議支持所有平台,幾乎可以把所有聯網物品和外部連接起來,被用來當做傳感器和致動器(比如通過Twitter讓房屋聯網)的通信協議。
優點:格式簡潔、占用帶寬小、移動端通信、PUSH、嵌入式系統
STOMP協議:STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息協議,是一種為MOM(Message Oriented Middleware,面向消息的中間件)設計的簡單文本協議。STOMP提供一個可互操作的連接格式,允許客戶端與任意STOMP消息代理(Broker)進行交互。
優點:命令模式(非topic\queue模式)
XMPP協議:XMPP(可擴展消息處理現場協議,Extensible Messaging and Presence Protocol)是基於可擴展標記語言(XML)的協議,多用於即時消息(IM)以及在線現場探測。適用於服務器之間的准即時操作。核心是基於XML流傳輸,這個協議可能最終允許因特網用戶向因特網上的其他任何人發送即時消息,即使其操作系統和瀏覽器不同。
優點:通用公開、兼容性強、可擴展、安全性高,但XML編碼格式占用帶寬大
其他基於TCP/IP自定義的協議:有些特殊框架(如:redis、kafka、zeroMq等)根據自身需要未嚴格遵循MQ規范,而是基於TCP\IP自行封裝了一套協議,通過網絡socket接口進行傳輸,實現了MQ的功能。
消息中間件對比
一般業務系統要引入MQ,最早大家都用ActiveMQ,但現在用的不多了。沒有經過大規模吞吐場景的驗證,社區也不活躍,不推薦再使用。
后來大家開始用rabbitMQ,但是它是使用erlang語言開發的,如果不精通erlang,對公司而言,幾乎處於不可控的狀態,但其是開源的,社區活躍度高,擁有比較穩定的支持。
現在越來越多的公司開始使用RocketMQ,但是要小心被拋棄的風險。如果公司有實力自己去維護開發,推薦使用。否則還是選擇RabbitMQ。
如果實在大數據的實時計算、日志采集等領域,用kafka是業界標准。
所以,對於中小型公司,技術實力一般的,應該用RabbitMQ,對於大公司,基礎架構研發能力強大的,推薦使用RocketMQ。
AMQP模型
Server(broker): 接受客戶端連接,實現AMQP消息隊列和路由功能的進程,可以理解為郵局。
Virtual Host:其實是一個虛擬概念,類似於權限控制組,一個Virtual Host里面可以有若干個Exchange和Queue,當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以划分出多個vhost,每個用戶在自己的vhost創建exchange/queue等,就好比於tomcat中webapps目錄下可以部署多個web項目。
Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列,就好比郵遞員。
Message Queue:消息隊列,用於存儲還未被消費者消費的消息,就好比於郵箱。
Message: 由Header和Body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先級是多少等,就好比於郵箱里面的信件。而Body是真正需要傳輸的APP數據,就像信件里面的信紙。
Binding:Binding聯系了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding后會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,兩者的匹配方式由Exchange Type決定,就好比於郵件上面的地址。
Connection:連接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP連接。
Channel:信道,僅僅創建了客戶端到Broker之間的連接后,客戶端還是不能發送消息的。需要為每一個Connection創建Channel,AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連接的建立和釋放都是十分昂貴的,如果一個客戶端每一個線程都需要與Broker交互,如果每一個線程都建立一個TCP連接,暫且不考慮TCP連接是否浪費,就算操作系統也無法承受每秒建立如此多的TCP連接,可以簡單的理解為線程池中的一個個線程。
實現原理
RabbitMQ基本概念有哪些?
Broker: 簡單來說就是消息隊列服務器實體
Exchange: 消息交換機,它指定消息按什么規則,路由到哪個隊列
Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列
Binding: 綁定,它的作用就是把exchange和queue按照路由規則綁定起來
Routing Key: 路由關鍵字,exchange根據這個關鍵字進行消息投遞
VHost: vhost 可以理解為虛擬 broker ,即 mini-RabbitMQ server。其內部均含有獨立的 queue、exchange 和 binding 等,但最最重要的是,其擁有獨立的權限系統,可以做到 vhost 范圍的用戶控制。當然,從 RabbitMQ 的全局角度,vhost 可以作為不同權限隔離的手段(一個典型的例子就是不同的應用可以跑在不同的 vhost 中)。
Producer: 消息生產者,就是投遞消息的程序
Consumer: 消息消費者,就是接受消息的程序
Channel: 消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務
由Exchange、Queue、RoutingKey三個才能決定一個從Exchange到Queue的唯一的線路。
RabbitMQ交換機有哪幾種?
參考鏈接
fanout:廣播模式 隊列直接綁定該交換機(不需要指定routing key)
direct:消息路由到那些binding key與routing key完全匹配的Queue中
topic: 前面提到的direct規則是嚴格意義上的匹配,換言之Routing Key必須與Binding Key相匹配的時候才將消息傳送給Queue,那么topic這個規則就是模糊匹配,可以通過通配符滿足一部分規則就可以傳送。
headers headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。而是根據發送的消息內容中的headers屬性進行匹配。
RabbitMQ有幾種工作模式?
simple模式(即最簡單的收發模式)
一個生產者,一個消費者
消息的消費者(consumer) 監聽 消息隊列,如果隊列中有消息,就消費掉,消 息被拿走后,自動從隊列中刪除(隱患 消息可能沒有被消費者正確處理,已經從隊列中消失了,造成消息的丟失,這里可以設置成手動的ack,但如果設置成手動ack,處理完后要及時發送ack消息給隊列,否則會造成內存溢出)。
work工作模式(資源的競爭)
一個生產者,多個消費者,每個消費者獲取到的消息唯一。
消息產生者將消息放入隊列消費者可以有多個,消費者1,消費者2同時監聽同一個隊列,消息被消費。C1 C2共同爭搶當前的消息隊列內容,誰先拿到誰負責消費消息(隱患:高並發情況下,默認會產生某一個消息被多個消費者共同使用,可以設置一個開關(syncronize) 保證一條消息只能被一個消費者使用)。
publish/subscribe發布訂閱(共享資源)
一個生產者發送的消息會被多個消費者獲取
生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息。
routing路由模式
發送消息到交換機並且要指定路由key ,消費者將隊列綁定到交換機時需要指定路由key
業務場景:error 通知;exception錯誤通知的功能;客戶通知:利用key路由,可以將程序中的錯誤封裝成消息傳入到消息隊列中,開發者可以自定義消費者,實時接收錯誤;
topic 主題模式(路由模式的一種)
將路由鍵和某模式進行匹配,此時隊列需要綁定在一個模式上,“#”匹配一個詞或多個詞,“*”只匹配一個詞
交換機根據key的規則模糊匹配到對應的隊列,由隊列的監聽消費者接收消息消費
63 在我的理解看來就是routing查詢的一種模糊匹配,就類似sql的模糊查詢方式
RabbitMQ怎么實現延遲消息隊列?
通過Rabbitmq本身隊列的特性來實現,需要使用Rabbitmq的死信交換機(Exchange)和消息的存活時間TTL(Time To Live)
在rabbitmq 3.5.7及以上的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實現延遲隊列功能。同時插件依賴Erlang/OPT 18.0及以上
如果不使用插件,延遲消息的延遲時間是依賴於Queue1的x-message-ttl的,也就是說,需要支持多少種延遲的時間,就得提前設置好多少個無消費類的Queue,而且由於轉發綁定的Queue2需要配到交換機中,比較死板,而真實的業務中消費類肯定是不一樣的
若想不借助插件實現rabbitMQ的延遲消息,實際就是利用一個沒有消費者的Queue1,等待消息過期后,通過交換機轉發到Queue2來進行消費,消息的延遲時間就是消息在Queue1中的存活時間
RabbitMQ消息基於什么傳輸?
由於TCP連接的創建和銷毀開銷較大,且並發數受系統資源限制,會造成性能瓶頸。RabbitMQ使用信道的方式來傳輸數據。信道是建立在真實的TCP連接內的虛擬連接,且每條TCP連接上的信道數量沒有限制。
信道是一個類似於NIO(一種TCP多路復用技術)的技術:在RabbitMQ中每個生產者、消費者線程各把持一個信道,多個信道復用了同一個TCP 連接。當每個信道的流量不是很大時,復用單連接可以在產生性能瓶頸的情況下有效地節 TCP 連接資源。當信道本身的流量很大時,就會開辟多連接,將這些信道均攤到這些連接中。
RabbitMQ消息如何分發?
循環分發:默認情況下,RabbitMQ 會順序的分發每個Message。當每個收到ack后,會將該Message刪除,然后將下一個Message分發到下一個Consumer。這種分發方式叫做round-robin。
默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。當然n是取余后的。它不管Consumer是否還有unacked Message,只是按照這個默認機制進行分發。那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻是毫無休息的機會。
那么,RabbitMQ是如何處理這種問題呢?
通過 basic.qos 方法設置prefetch_count=1。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtualHost來細化你的設計。
RabbitMQ消息怎么路由?
從概念上來說,消息路由必須有三部分:交換器、路由、綁定。生產者把消息發布到交換器上;綁定決定了消息如何從路由器路由到特定的隊列;消息最終到達隊列,並被消費者接收。
消息發布到交換器時,消息將擁有一個路由鍵(routing key),在消息創建時設定。
通過隊列路由鍵,可以把隊列綁定到交換器上。
消息到達交換器后,RabbitMQ會將消息的路由鍵與隊列的路由鍵進行匹配(針對不同的交換器有不同的路由規則)。如果能夠匹配到隊列,則消息會投遞到相應隊列中;如果不能匹配到任何隊列,消息將進入 “黑洞”。
常用的交換器主要分為一下三種:
direct:如果路由鍵完全匹配,消息就被投遞到相應的隊列
fanout:如果交換器收到消息,將會廣播到所有綁定的隊列上
topic:可以使來自不同源頭的消息能夠到達同一個隊列。 使用topic交換器時,可以使用通配符,比如:“*” 匹配特定位置的任意文本, “.” 把路由鍵分為了幾部分,“#” 匹配所有規則等。特別注意:發往topic交換器的消息不能隨意的設置選擇鍵(routing_key),必須是由"."隔開的一系列的標識符組成。
常見問題
如何保證消息的消費順序?
一個queue,有多個consumer去消費,這樣就會造成順序的錯誤,consumer從MQ里面讀取數據是有序的,但是每個consumer的執行時間是不固定的,無法保證先讀到消息的consumer一定先完成操作,這樣就會出現消息並沒有按照順序執行,造成數據順序錯誤。
一個queue對應一個consumer,但是consumer里面進行了多線程消費,這樣也會造成消息消費順序錯誤。
拆分多個queue,每個queue一個consumer,就是多一些queue而已,確實是麻煩點;這樣也會造成吞吐量下降,可以在消費者內部采用多線程的方式取消費。
或者就一個queue但是對應一個consumer,然后這個consumer內部用內存隊列做排隊,然后分發給底層不同的worker來處理
使用排他隊列
如何保證消息不被重復消費?
參考文檔
1.冪等性
冪等(idempotent、idempotence)是一個數學與計算機學概念,常見於抽象代數中。
在編程中一個冪等操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同。冪等函數,或冪等方法,是指可以使用相同參數重復執行,並能獲得相同結果的函數。這些函數不會影響系統狀態,也不用擔心重復執行會對系統造成改變。例如,“setTrue()”函數就是一個冪等函數,無論多次執行,其結果都是一樣的.更復雜的操作冪等保證是利用唯一交易號(流水號)實現.
簡單來說,冪等性就是一個數據或者一個請求,給你重復來了多次,你得確保對應的數據是不會改變的,不能出錯。
2.出現重復消費場景
(1)首先,比如rabbitmq、rocketmq、kafka,都有可能會出現消息重復消費的問題。因為這個問題通常不是由mq來保證的,而是消費方自己來保證的。
(2)舉例kafka來說明重復消費問題
kafka有一個叫做offset的概念,就是每個消息寫進去,都有一個offset代表他的序號,然后consumer消費了數據之后,每隔一段時間,會把自己消費過的消息的offset提交一下,代表我已經消費過了,下次就算重啟,kafka就會讓消費者從上次消費到的offset來繼續消費。
但是萬事總有例外,如果consumer消費了數據,還沒來得及發送自己已經消費的消息的offset就掛了,那么重啟之后就會收到重復的數據。
3.保證冪等性(重復消費)
要保證消息的冪等性,這個要結合業務的類型來進行處理。下面提供幾個思路供參考:
(1)、可在內存中維護一個set,只要從消息隊列里面獲取到一個消息,先查詢這個消息在不在set里面,如果在表示已消費過,直接丟棄;如果不在,則在消費后將其加入set當中。
(2)、如何要寫數據庫,可以拿唯一鍵先去數據庫查詢一下,如果不存在在寫,如果存在直接更新或者丟棄消息。
(3)、如果是寫redis那沒有問題,每次都是set,天然的冪等性。
(4)、讓生產者發送消息時,每條消息加一個全局的唯一id,然后消費時,將該id保存到redis里面。消費時先去redis里面查一下有么有,沒有再消費。
(5)、數據庫操作可以設置唯一鍵,防止重復數據的插入,這樣插入只會報錯而不會插入重復數據。
如何保證RabbitMQ消息的可靠傳輸?
主流做法是
發送消息前,先將消息持久化到本地(比如數據庫),防止消息發送失敗時,再次發送找不到原來的消息
設置發送方確認模式
發送消息后,將消息的狀態存入數據庫並設置為發送中
發送端設定簽收超時時間並監聽 RabbitMQ 看消費端是否有簽收
如果監聽到簽收,則將該消息的狀態改為投遞成功,否則一直等待直到超時
設置補償機制:起一個定時任務,每隔一段時間,從數據庫中找出狀態為發送中且時間超時的消息,重新投遞
如果多次重投后依然失敗,就需要人工去處理
數據的丟失問題,可能出現在生產者、MQ、消費者中:
如何確保消息正確地發送至RabbitMQ?
將 channel 設置成 confirm 模式(發送方確認模式),則所有在 channel
上發布的消息都會被指派一個唯一的ID。一旦消息被投遞到目的隊列后,或者消息被寫入磁盤后(可持久化的消息),信道會發送一個確認給生產者(包含消息唯一ID)。
如果RabbitMQ發生內部錯誤從而導致消息丟失,會發送一條nack(not acknowledged,未確認)消息。
發送方確認模式是異步的,生產者應用程序在等待確認的同時,可以繼續發送消息。當確認消息到達生產者應用程序,生產者應用程序的回調方法就會被觸發來處理確認消息
如何確保消息接收方消費了消息?
消費方接收每一條消息后都必須在消費完之后進行手動確認(消息接收和消息確認是兩個不同操作)消費端確認之后,RabbitMQ會把消息從隊列中刪除
RabbitMQ僅通過消費端的連接中斷來確認是否需要重新發送消息。也就是說,只要連接不中斷,RabbitMQ給了消費端足夠長的時間來處理消息。保證數據的最終一致性;
下面羅列幾種特殊情況:
如果消費者接收到消息,在確認之前斷開了連接或取消訂閱,RabbitMQ會認為消息沒有被分發,然后重新分發給下一個訂閱的消費者。(可能存在消息重復消費的隱患,需要去重)
如果消費者接收到消息卻沒有確認消息,連接也未斷開,則RabbitMQ認為該消費者繁忙,將不會給該消費者分發更多的消息。
如何避免消息重復投遞或重復消費?
避免消息重復投遞:在消息生產時,MQ內部針對每條生產者發送的消息生成一個inner-msg-id,作為去重的依據(消息投遞失敗並重傳),避免重復的消息進入隊列;
避免消息重復消費:
在消息消費時,要求消息體中必須要有一個唯一Id(對於同一業務全局唯一,如支付ID、訂單ID、帖子ID等)作為去重的依據,避免同一條消息被重復消費
消費消息接口做冪等處理
RabbitMQ如何持久化?
RabbitMQ 持久化包含3個部分
exchange 持久化,在聲明時指定 durable 為 true
queue 持久化,在聲明時指定 durable 為 true
message 持久化,在投遞時指定 delivery_mode=2(1是非持久化)
如果將所有的消息都進行持久化操作,這樣會嚴重影響 RabbitMQ 的性能。寫入磁盤的速度可比寫入內存的速度要慢很多。所以需要在可靠性和吞吐量之間做權衡。
將 exchange、queue 和 message 都進行持久化操作后,也不能保證消息一定不會丟失,消息存入RabbitMQ 之后,還需要一段時間才能存入硬盤。RabbitMQ 並不會為每條消息都進行同步存盤,如果在這段時間,服務器宕機或者重啟,消息還沒來得及保存到磁盤當中,就會丟失。對於這種情況,可以引入 RabiitMQ 鏡像隊列機制(一種高可用方案)。
如何保證高可用的RabbitMQ 的集群
普通集群模式,意思就是在多台機器上啟動多個 RabbitMQ 實例,每個機器啟動一個。你創建的 queue,只會放在一個 RabbitMQ 實例上,但是每個實例都同步 queue 的元數據(元數據可以認為是 queue 的一些配置信息,通過元數據,可以找到 queue 所在實例)。你消費的時候,實際上如果連接到了另外一個實例,那么那個實例會從 queue 所在實例上拉取數據過來。這方案主要是提高吞吐量的,就是說讓集群中多個節點來服務某個 queue 的讀寫操作。
鏡像集群模式:這種模式,才是所謂的 RabbitMQ 的高可用模式。跟普通集群模式不一樣的是,在鏡像集群模式下,你創建的 queue,無論元數據還是 queue 里的消息都會存在於多個實例上,就是說,每個 RabbitMQ 節點都有這個 queue 的一個完整鏡像,包含 queue 的全部數據的意思。然后每次你寫消息到 queue 的時候,都會自動把消息同步到多個實例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一個策略,這個策略是鏡像集群模式的策略,指定的時候是可以要求數據同步到所有節點的,也可以要求同步到指定數量的節點,再次創建 queue 的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。這樣的話,好處在於,你任何一個機器宕機了,沒事兒,其它機器(節點)還包含了這個 queue 的完整數據,別的 consumer 都可以到其它節點上去消費數據。壞處在於,第一,這個性能開銷也太大了吧,消息需要同步到所有機器上,導致網絡帶寬壓力和消耗很重!RabbitMQ 一個 queue 的數據都是放在一個節點里的,鏡像集群下,也是每個節點都放這個 queue 的完整數據。
消息積壓了怎么辦?
參考文檔
大量消息在mq里積壓了幾個小時了還沒解決:
先修復consumer的問題,確保其恢復消費速度,然后將現有consumer都停掉。
臨時建立好原先10倍或者20倍的queue數量(新建一個topic,partition是原來的10倍)。
然后寫一個臨時分發消息的consumer程序,這個程序部署上去消費積壓的消息,消費之后不做耗時處理,直接均勻輪詢寫入臨時建好分10數量的queue里面。
緊接着征用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的消息。
這種做法相當於臨時將queue資源和consumer資源擴大10倍,以正常速度的10倍來消費消息。
等快速消費完了之后,恢復原來的部署架構,重新用原來的consumer機器來消費消息。
消息設置了過期時間,過期就丟了怎么辦:
批量重導:在流量低峰期(比如夜深人靜時),寫一個程序,手動去查詢丟失的那部分數據,然后將消息重新發送到mq里面,把丟失的數據重新補回來。
積壓消息長時間沒有處理,mq放不下了怎么辦:
丟棄+批量重導:首先,臨時寫個程序,連接到mq里面消費數據,收到消息之后直接將其丟棄,快速消費掉積壓的消息,降低MQ的壓力,然后走第二種方案,在晚上夜深人靜時去手動查詢重導丟失的這部分數據。
設計MQ思路
參考文檔
場景:解耦/最終一致性/廣播/錯峰流控
使用技巧
參考文檔
隊列屬性
queue : 隊列的名字
durable : 為true表示隊列中數據持久化到磁盤,可以防止mq宕機重啟數據丟失
exclusive : 為true表示排他性,只允許一個當前連接訪問該隊列,當前已連接就不允許新的連接進入否則報錯,當連接斷開當前隊列會銷毀
autoDelete : 為true表示自動刪除,當沒有Connection連接到隊列的時候,會自動刪除
arguments : 這個參數用來添加一些額外參數的,比如添加x-message-ttl為5000,則表示消息超過5秒沒被處理就會超時過期;x-expires設置120000表示隊列在2分鍾內沒被消費則被刪除;x-max-length,x-max-length-bytes表示傳送數據的最大長度和字節數x-dead-letter-exchange,x-dead-letter-routing-key表示死信交換機和死信路由,放在需要過期或處理失敗的隊列屬性中,這些數據會轉發到死信隊列存儲起來,創建普通的交換機和隊列綁定,把交換機名填到x-dead-letter-exchange的值,填寫路由鍵要符合死信隊列的路由鍵;x-max-priority,表示設置優先級,范圍為0~255,只有當消息堆積的時候,這個優先級才有意義,數字越大優先級越高;x-queue-mode當為lazy,表示惰性隊列,3.6.0之后才被引入的概念,相比默認的模式,惰性隊列模式會將生產者產生的消息直接存到磁盤中,這當然會增加IO開銷,但適合應對大量消息堆積的情況;因為當大量消息堆積時,內存也不夠存放,會將消息轉存到磁盤,這個過程也是比較耗時且過程中不能接收新的消息。如果需要將普通隊列轉換成惰性隊列需要將原來的隊列刪除,重新創建個惰性隊列綁定。
交換機屬性
exchange : 交換機名稱
type : 交換機類型
durable : 持久化,同隊列
autoDelete : 是否自動刪除,同隊列
internal : 若為true,表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定。
arguments : 額外參數,目前只有個alternate-exchange,表示當生產者發送消息到這個交換機,路由不到該交換機的隊列,則會嘗試這個參數指定的交換機進行路由,若路由鍵匹配,則路由到alternate-exchange指定的隊列,相當於轉發了,剛好和上一個參數internal配合,若不想本交換機起到路由隊列的作用,可以設置internal為true,把消息都轉發到alternate-exchange指定的交換機,由該交換機來路由指定隊列,