什么是MQ?
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取隊列中的消息。
RabbitMQ是MQ的一種。下面詳細介紹一下RabbitMQ的基本概念。
1、隊列、生產者、消費者
隊列是RabbitMQ的內部對象,用於存儲消息。生產者(下圖中的P)生產消息並投遞到隊列中,消費者(下圖中的C)可以從隊列中獲取消息並消費。
多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息並處理。
2、Exchange、Binding
剛才我們看到生產者將消息投遞到隊列中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),再通過Binding將Exchange與Queue關聯起來。
3、Exchange Type、Bingding key、routing key
在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key。在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。
生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,生產者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪里。
RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。
fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。
direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。
topic:將消息路由到binding key與routing key模式匹配的隊列中。
附上一張RabbitMQ的結構圖:
最后來具體解析一下幾個問題:
1、可以自動創建隊列,也可以手動創建隊列,如果自動創建隊列,那么是誰負責創建隊列呢?是生產者?還是消費者?
如果隊列不存在,當然消費者不會收到任何的消息。但是如果隊列不存在,那么生產者發送的消息就會丟失。所以,為了數據不丟失,消費者和生產者都可以創建隊列。那么如果創建一個已經存在的隊列呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創建如果參數和第一次不一樣,那么該操作雖然成功,但是隊列屬性並不會改變。
隊列對於負載均衡的處理是完美的。對於多個消費者來說,RabbitMQ使用輪詢的方式均衡的發送給不同的消費者。
2、RabbitMQ的消息確認機制
默認情況下,如果消息已經被某個消費者正確的接收到了,那么該消息就會被從隊列中移除。當然也可以讓同一個消息發送到很多的消費者。
如果一個隊列沒有消費者,那么,如果這個隊列有數據到達,那么這個數據會被緩存,不會被丟棄。當有消費者時,這個數據會被立即發送到這個消費者,這個數據被消費者正確收到時,這個數據就被從隊列中刪除。
那么什么是正確收到呢?通過ack。每個消息都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數據沒有被ack,那么:
RabbitMQ Server會把這個信息發送到下一個消費者。
如果這個app有bug,忘記了ack,那么RabbitMQServer不會再發送數據給它,因為Server認為這個消費者處理能力有限。
而且ack的機制可以起到限流的作用(Benefitto throttling):在消費者處理完成數據后發送ack,甚至在額外的延時后發送ack,將有效的均衡消費者的負載。
1.背景
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現。
2.應用場景
2.1異步處理
場景說明:用戶注冊后,需要發注冊郵件和注冊短信,傳統的做法有兩種1.串行的方式;2.並行的方式
(1)串行方式:將注冊信息寫入數據庫后,發送注冊郵件,再發送注冊短信,以上三個任務全部完成后才返回給客戶端。 這有一個問題是,郵件,短信並不是必須的,它只是一個通知,而這種做法讓客戶端等待沒有必要等待的東西.
(2)並行方式:將注冊信息寫入數據庫后,發送郵件的同時,發送短信,以上三個任務完成后,返回給客戶端,並行的方式能提高處理的時間。
假設三個業務節點分別使用50ms,串行方式使用時間150ms,並行使用時間100ms。雖然並性已經提高的處理時間,但是,前面說過,郵件和短信對我正常的使用網站沒有任何影響,客戶端沒有必要等着其發送完成才顯示注冊成功,英愛是寫入數據庫后就返回.
(3)消息隊列
引入消息隊列后,把發送郵件,短信不是必須的業務邏輯異步處理
由此可以看出,引入消息隊列后,用戶的響應時間就等於寫入數據庫的時間+寫入消息隊列的時間(可以忽略不計),引入消息隊列后處理后,響應時間是串行的3倍,是並行的2倍。
2.2 應用解耦
場景:雙11是購物狂節,用戶下單后,訂單系統需要通知庫存系統,傳統的做法就是訂單系統調用庫存系統的接口.
這種做法有一個缺點:
- 當庫存系統出現故障時,訂單就會失敗。(這樣馬雲將少賺好多好多錢^ ^)
-
訂單系統和庫存系統高耦合.
引入消息隊列 -
訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
- 庫存系統:訂閱下單的消息,獲取下單消息,進行庫操作。
就算庫存系統出現故障,消息隊列也能保證消息的可靠投遞,不會導致消息丟失(馬雲這下高興了).
流量削峰
流量削峰一般在秒殺活動中應用廣泛
場景:秒殺活動,一般會因為流量過大,導致應用掛掉,為了解決這個問題,一般在應用前端加入消息隊列。
作用:
1.可以控制活動人數,超過此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒有成功過呢^^)
2.可以緩解短時間的高流量壓垮應用(應用程序按自己的最大處理能力獲取訂單)
1.用戶的請求,服務器收到之后,首先寫入消息隊列,加入消息隊列長度超過最大值,則直接拋棄用戶請求或跳轉到錯誤頁面.
2.秒殺業務根據消息隊列中的請求信息,再做后續處理.
3.系統架構
幾個概念說明:
Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue:消息的載體,每個消息都會被投到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來.
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以有多個vhost,用作不同用戶的權限分離。
Producer:消息生產者,就是投遞消息的程序.
Consumer:消息消費者,就是接受消息的程序.
Channel:消息通道,在客戶端的每個連接里,可建立多個channel.
4.任務分發機制
4.1Round-robin dispathching循環分發
RabbbitMQ的分發機制非常適合擴展,而且它是專門為並發程序設計的,如果現在load加重,那么只需要創建更多的Consumer來進行任務處理。
4.2Message acknowledgment消息確認
為了保證數據不被丟失,RabbitMQ支持消息確認機制,為了保證數據能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack,而應該是在處理完數據之后發送ack.
在處理完數據之后發送ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ可以安全的刪除它了.
如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message發送到下一個Consumer,這樣就保證在Consumer異常退出情況下數據也不會丟失.
RabbitMQ它沒有用到超時機制.RabbitMQ僅僅通過Consumer的連接中斷來確認該Message並沒有正確處理,也就是說RabbitMQ給了Consumer足夠長的時間做數據處理。
如果忘記ack,那么當Consumer退出時,Mesage會重新分發,然后RabbitMQ會占用越來越多的內存.
5.Message durability消息持久化
要持久化隊列queue的持久化需要在聲明時指定durable=True;
這里要注意,隊列的名字一定要是Broker中不存在的,不然不能改變此隊列的任何屬性.
隊列和交換機有一個創建時候指定的標志durable,durable的唯一含義就是具有這個標志的隊列和交換機會在重啟之后重新建立,它不表示說在隊列中的消息會在重啟后恢復
消息持久化包括3部分
1. exchange持久化,在聲明時指定durable => true
hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的
- 1
2.queue持久化,在聲明時指定durable => true
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的
- 1
3.消息持久化,在投遞時指定delivery_mode => 2(1是非持久化).
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
- 1
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的,如果exchange和queue兩者之間有一個持久化,一個非持久化,則不允許建立綁定.
注意:一旦創建了隊列和交換機,就不能修改其標志了,例如,創建了一個non-durable的隊列,然后想把它改變成durable的,唯一的辦法就是刪除這個隊列然后重現創建。
6.Fair dispath 公平分發
你可能也注意到了,分發機制不是那么優雅,默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。n是取余后的,它不管Consumer是否還有unacked Message,只是按照這個默認的機制進行分發.
那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻毫無休息的機會,那么,Rabbit是如何處理這種問題呢?
通過basic.qos方法設置prefetch_count=1,這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message,換句話說,在接收到該Consumer的ack前,它不會將新的Message分發給它
channel.basic_qos(prefetch_count=1)
- 1
注意,這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtualHost來細化你的設計。
7.分發到多個Consumer
7.1Exchange
先來溫習以下交換機路由的幾種類型:
Direct Exchange:直接匹配,通過Exchange名稱+RountingKey來發送與接收消息.
Fanout Exchange:廣播訂閱,向所有的消費者發布消息,但是只有消費者將隊列綁定到該路由器才能收到消息,忽略Routing Key.
Topic Exchange:主題匹配訂閱,這里的主題指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.來分隔多個詞,只有消息這將隊列綁定到該路由器且指定RoutingKey符合匹配規則時才能收到消息;
Headers Exchange:消息頭訂閱,消息發布前,為消息定義一個或多個鍵值對的消息頭,然后消費者接收消息同時需要定義類似的鍵值對請求頭:(如:x-mactch=all或者x_match=any),只有請求頭與消息頭匹配,才能接收消息,忽略RoutingKey.
默認的exchange:如果用空字符串去聲明一個exchange,那么系統就會使用”amq.direct”這個exchange,我們創建一個queue時,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去
channel.BasicPublish("", "TaskQueue", properties, bytes);
- 1
因為在第一個參數選擇了默認的exchange,而我們申明的隊列叫TaskQueue,所以默認的,它在新建一個也叫TaskQueue的routingKey,並綁定在默認的exchange上,導致了我們可以在第二個參數routingKey中寫TaskQueue,這樣它就會找到定義的同名的queue,並把消息放進去。
如果有兩個接收程序都是用了同一個的queue和相同的routingKey去綁定direct exchange的話,分發的行為是負載均衡的,也就是說第一個是程序1收到,第二個是程序2收到,以此類推。
如果有兩個接收程序用了各自的queue,但使用相同的routingKey去綁定direct exchange的話,分發的行為是復制的,也就是說每個程序都會收到這個消息的副本。行為相當於fanout類型的exchange。
下面詳細來說:
7.2 Bindings 綁定
綁定其實就是關聯了exchange和queue,或者這么說:queue對exchange的內容感興趣,exchange要把它的Message deliver到queue。
7.3Direct exchange
Driect exchange的路由算法非常簡單:通過bindingkey的完全匹配,可以用下圖來說明.
Exchange和兩個隊列綁定在一起,Q1的bindingkey是orange,Q2的binding key是black和green.
當Producer publish key是orange時,exchange會把它放到Q1上,如果是black或green就會到Q2上,其余的Message被丟棄.
7.4 Multiple bindings
多個queue綁定同一個key也是可以的,對於下圖的例子,Q1和Q2都綁定了black,對於routing key是black的Message,會被deliver到Q1和Q2,其余的Message都會被丟棄.
7.5 Topic exchange
對於Message的routing_key是有限制的,不能使任意的。格式是以點號“.”分割的字符表。比如:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。你可以放任意的key在routing_key中,當然最長不能超過255 bytes。
對於routing_key,有兩個特殊字符
- *(星號)代表任意一個單詞
-
#(hash)0個或多個單詞
Producer發送消息時需要設置routing_key,routing_key包含三個單詞和連個點號o,第一個key描述了celerity(靈巧),第二個是color(色彩),第三個是物種:
在這里我們創建了兩個綁定: Q1 的binding key 是”.orange.“; Q2 是 “..rabbit” 和 “lazy.#”:- Q1感興趣所有orange顏色的動物
- Q2感興趣所有rabbits和所有的lazy的.
例子:rounting_key 為 “quick.orange.rabbit”將會發送到Q1和Q2中
rounting_key 為”lazy.orange.rabbit.hujj.ddd”會被投遞到Q2中,#匹配0個或多個單詞。
8.消息序列化
RabbitMQ使用ProtoBuf序列化消息,它可作為RabbitMQ的Message的數據格式進行傳輸,由於是結構化的數據,這樣就極大的方便了Consumer的數據高效處理,當然也可以使用XML,與XML相比,ProtoBuf有以下優勢:
1.簡單
2.size小了3-10倍
3.速度快了20-100倍
4.易於編程
6.減少了語義的歧義.
,ProtoBuf具有速度和空間的優勢,使得它現在應用非常廣泛