[Erlang 0080] RabbitMQ :VHost,Exchanges, Queues,Bindings and Channels


  和RabbitMQ這個項目的緣分好奇怪,很長一段時間內是只關注源代碼,真的是Erlang開源項目中的典范;現在要在項目中應用RabbitMQ,從新的視角切入,全新的感覺.仿佛舊情人換了新衣,雖是熟稔卻有不曾領略的風情. RabbitMQ提供了一整套機制來處理消息的發送,接收,容錯,管理,上一篇文章中我提到了一篇Rabbits and warrens的文章,是一篇非常棒的入門文章,但是里面忽略了不少細節,我沿着RabbitMQ in Action重新梳理了一遍,筆記於此,備忘.

    

Exchanges, queues, and bindings

 
   exchanges, queues, and bindings是三個基礎的概念, 他們的作用是: exchanges are where producers publish their messages,  queues are where the messages end up and are received by consumers, and  bindings are how the messages get routed from the exchange to particular queues. 
  
下面我們用一副簡單的思維導圖把上面的概念組織起來:
 
 
   上面還提到了一個vhost的概念,vhost是為了組織exchanges, queues, and bindings提出的概念,我們就從它開始講起:
 

VHost

 
   Vhosts也是AMQP的一個基礎概念,連接到RabbitMQ默認就有一個名為"/"的vhost可用,本地調試的時候可以直接使用這個默認的vhost.這個"/"的訪問可以使用guest用戶名(密碼guest)訪問.可以使用rabbitmqctl工具修改這個賬戶的權限和密碼,這在生產環境是必須要關注的. 出於安全和可移植性的考慮,一個vhost內的exchange不能綁定到其他的vhost.
 
    可以按照業務功能組來規划vhost,在集群環境中只要在某個節點創建vhost就會在整個集群內的節點都創建該vhost.VHost和權限都不能通過AMQP協議創建,在RabbitMQ中都是使用rabbitmqctl進行創建,管理.
 
如何創建vhost   
    vhost和permission(權限)信息是並不是通過AMQP創建而是通過rabbitmqctl工具來添加,管理的.
 
說完vhost我們就來看看重中之重的消息:Message
 

Message

 
   消息由兩部分組成:  payload and  label. "payload"是實際要傳輸的數據,至於數據的格式RabbitMQ並不關心,"label"描述payload,包括exchange name 和可選的topic tag.消息一旦到了consumer那里就只有payload部分了,label部分並沒有帶過來.RabbitMQ並不告訴你消息是誰發出的.這好比你收到一封信但是信封上是空白的.當然想知道是誰發的還是有辦法的,在消息內容中包含發送者的信息就可以了.
  
   消息的consumer和producer對應的概念是sending和receiving並不對應client和server.通過channel我們可以創建很多並行的傳輸 TCP鏈接不再成為瓶頸,我們可以把RabbitMQ當做應用程序級別的路由器.
 
 
Consumer消息的接收方式
     Consumer有兩種方式接收消息:
     通過 basic.consume 訂閱隊列.channel將進入接收模式直到你取消訂閱.訂閱模式下Consumer只要上一條消息處理完成(ACK或拒絕),就會主動接收新消息.如果消息到達queue就希望得到盡快處理,也應該使用basic.consume命令.
     還有一種情況,我們不需要一直保持訂閱,只要使用basic.get命令主動獲取消息即可.當前消息處理完成之后,繼續獲取消息需要主動執行basic.get 不要"在循環中使用basic.ge"t當做另外一種形式的basic.consume,因為這種做法相比basic.consume有額外的成本:basic.get本質上就是先訂閱queue取回一條消息之后取消訂閱.Consumer吞吐量大的情況下通常都會使用basic.consume.
 
 
要是沒有Consumer怎么辦?
 
     如果消息沒有Consumer就會老老實實呆在隊列里面.
 
多個Consumer訂閱同一個隊列
 
    只要Consumer訂閱了queue,消息就會發送到該Consumer.我們的問題是這種情況下queue中的消息是如何分發的?
    如果一個rabbit queue有多個consumer,具體到隊列中的某條消息只會發送到其中的一個Consumer.
 
消息確認
   
    所有接收到的消息都要求發送響應消息(ACK).這里有兩種方式一種是Consumer使用basic.ack明確發送ACK,一種是訂閱queue的時候指定auto_ack為true,這樣消息一到Consumer那里RabbitMQ就會認為消息已經得到ACK.
   要注意的是這里的響應和消息的發送者沒有絲毫關系,ACK只是Consumer向RabbitMQ確認消息已經正確的接收到消息,RabbitMQ可以安全移除該消息,僅此而已.
 
沒有正確響應怎么辦
 
    如果Consumer接收了一個消息就還沒有發送ACK就與RabbitMQ斷開了,RabbitMQ會認為這條消息沒有投遞成功會重新投遞到別的Consumer.如果你的應用程序崩掉了,你可以設置備用程序來繼續完成消息的處理.
   如果Consumer本身邏輯有問題沒有發送ACK的處理,RabbitMQ不會再向該Consumer發送消息.RabbitMQ會認為這個Consumer還沒有處理完上一條消息,沒有能力繼續接收新消息.我們可以善加利用這一機制,如果需要處理過程是相當復雜的,應用程序可以延遲發送ACK直到處理完成為止.這可以有效控制應用程序這邊的負載,不致於被大量消息沖擊.
 
 
拒絕消息
 
    由於要拒絕消息,所以ACK響應消息還沒有發出,所以這里拒絕消息可以有兩種選擇:
    1.Consumer直接斷開RabbitMQ 這樣RabbitMQ將把這條消息重新排隊,交由其它Consumer處理.這個方法在RabbitMQ各版本都支持.這樣做的壞處就是連接斷開增加了RabbitMQ的額外負擔,特別是consumer出現異常每條消息都無法正常處理的時候.
   2. RabbitMQ 2.0.0可以使用 basic.reject 命令,收到該命令RabbitMQ會重新投遞到其它的Consumer.如果設置requeue為false,RabbitMQ會直接將消息從queue中移除.
   其實還有一種選擇就是直接忽略這條消息並發送ACK,當你明確直到這條消息是異常的不會有Consumer能處理,可以這樣做拋棄異常數據.為什么要發送basic.reject消息而不是ACK?RabbitMQ后面的版本可能會引入"dead letter"隊列,如果想利用dead letter做點文章就使用basic.reject並設置requeue為false.
  
 
消息持久化
    消息的持久化需要在消息投遞的時候設置delivery mode值為2.由於消息實際存儲於queue之中,"皮之不存毛將焉附"邏輯上,消息持久化同時要求exchange和queue也是持久化的.這是消息持久化必須滿足的三個條件. 
     持久化的代價就是性能損失,磁盤IO遠遠慢於RAM(使用SSD會顯著提高消息持久化的性能) , 持久化會大大降低RabbitMQ每秒可處理的消息.兩者的性能差距可能在10倍以上.
 
消息恢復
   consumer從durable queue中取回一條消息之后並發回了ACK消息,RabbitMQ就會將其標記,方便后續垃圾回收.如果一條持久化的消息沒有被consumer取走,RabbitMQ重啟之后會自動重建exchange和queue(以及bingding關系),消息通過持久化日志重建再次進入對應的queues,exchanges.
 
皮之不存,毛將焉附?緊接着我們看看消息實際存放的地方:Queue

Queue

 
  Queues是Massage的落腳點和等待接收的地方,消息除非被扔進黑洞否則就會被安置在一個Queue里面.Queue很適合做負載均衡,RabbitMQ可以在若干consumer中間實現輪流調度(Round-Robin).
 
如何創建隊列
   consumer和producer都可以創建Queue,如果consumer來創建,避免consumer訂閱一個不存在的Queue的情況,但是這里要承擔一種風險:消息已經投遞但是consumer尚未創建隊列,那么消息就會被扔到黑洞,換句話說消息丟了;避免這種情況的好辦法就是producer和consumer都嘗試創建一下queue. 如果consumer在已經訂閱了另外一個Queue的情況下無法完成新Queue的創建,必須取消之前的訂閱將Channel置為傳輸模式("transmit")才能創建新的Channel.
   創建Queue的時候通常要指定名字,名字方便consumer訂閱.即使你不指定Rabbit會給它分配一個隨機的名字,這在使用臨時匿名隊列完成RPC-over-AMQP調用時會非常有用.
   創建Queue的時候還有兩個非常有用的選項:
   exclusive—When set to true, your queue becomes private and can only be consumed by your app. This is useful when you need to limit a queue to only one consumer.
   auto-delete—The queue is automatically deleted when the last consumer unsubscribes.
 
   如果要創建只有一個consumer使用的臨時queue可以組合使用auto-delete和 exclusive.consumer一旦斷開連接該隊列自動刪除.
   重復創建Queue會怎樣?如果Queue創建的選項完全一致的話,RabbitMQ直接返回成功,如果名稱相同但是創建選項不一致就會返回創建失敗.如果是想檢查Queue是否存在,可以設置queue.declare命令的passive 選項為true:如果隊列存在就會返回成功,如果隊列不存在會報錯且不會執行創建邏輯.
 
消息是如何從動態路由到不同的隊列的?這就看下面的內容了
 

bindings and exchanges

 
消息如何發送到隊列
 
     消息是如何發送到隊列的?這就要說到AMQP  bindings and exchanges. 投遞消息到queue都是經由exchange完成的,和生活中的郵件投遞一樣也需要遵循一定的規則,在RabbitMQ中規則是通過routing key把queue綁定到exchange上,這種綁定關系即binding.消息發送到RabbitMQ都會攜帶一個routing key(哪怕是空的key),RabbitMQ會根據bindings匹配routing key,如果匹配成功消息會轉發到指定Queue,如果沒有匹配到queue消息就會被扔到黑洞.
 
如何發送到多個隊列
 
  消息是分發到多個隊列的?AMQP協議里面定義了幾種不同類型的exchange:direct, fanout, topic, and headers. 每一種都實現了一種 routing 算法. header的路由消息並不依賴routing key而是去匹配AMQP消息的header部分,這和下面提到的direct exchange如出一轍,但是性能要差很多,在實際場景中幾乎不會被用到.
 
direct exchange   routing key完全匹配才轉發
fanout exchange  不理會routing key,消息直接廣播到所有綁定的queue 
topic exchange  對routing key模式匹配
 
 
exchange持久化
 
  創建queue和exchange默認情況下都是沒有持久化的,節點重啟之后queue和exchange就會消失,這里需要特別指定queue和exchange的durable屬性.
 
 
Consumer是直接創建TCP鏈接到RabbitMQ嗎?下面就是答案:
 

Channel

 
    無論是要發布消息還是要獲取消息 ,應用程序都需要通過TCP連接到RabbitMQ.應用程序連接並通過權限認證之后就要創建Channel來執行AMQP命令.Channel是建立在實際TCP連接之上通信管道,這里之所以引入channel的概念而不是直接通過TCP鏈接直接發送AMQP命令,是出於兩方面的考慮:建立上成百上千的TCP鏈接,一方面浪費了TCP鏈接,一方面很快會觸及系統瓶頸.引入了Channel之后多個進程與RabbitMQ的通信可以在一條TCP鏈接上完成.我們可以把TCP類比做光纜,那么Channel就像光纜中的一根根光纖.
 

參考資料

 
 
------------ 休息的分隔線 --------------
 
各位晚安,送上小圖一張
 

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM