RabbitMQ框架構建系列(一)——AMPQ協議


一、MQ

  在介紹RabbitMq之前,先來說一下MQ。什么是MQ?MQ全稱為Message Queue即消息隊列,就是一個消息的容器, MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取或者訂閱隊列中的消息。MQ框架非常之多,比較流行的有RabbitMq、ActiveMq、ZeroMq、kafka。根據自己項目的業務場景和需求來選擇相應的MQ框架(MQ框架比較)。為什么要使用MQ呢?在項目中,一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。比如在高並發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MySQL,直接導致無數的行鎖表鎖,甚至最后請求會堆積過多,從而觸發too many connections錯誤。通過使用消息隊列,我們可以異步處理請求,從而緩解系統的壓力。

二、RabbitMQ

  在MQ眾多框架中RabbitMQ仍然是首選,RabbitMQ用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標准實現的可復用的企業消息系統。如果不熟悉AMQP,直接看RabbitMQ的文檔會比較困難。所以我先學習一下AMQP協議。

三、AMQP協議

  AMQP協議參考和轉載自https://blog.csdn.net/yinwenjie/article/details/50820369寫的非常好。

  1、AMQP協議的各個組成部分

 

    (1)AMQP協議的各個組成部分:AMQP協議中的元素包括:Message(消息體)、Producer(消息生產者)、Consumer(消息消費者)、Virtual Host(虛擬節點)、Exchange(交換機)、Queue(隊列)等

    (2)由Producer(消息生產者)和Consumer(消息消費者)構成了AMQP的客戶端,他們是發送消息和接收消息的主體。AMQP服務端稱為Broker,一個Broker中一定包含完整的Virtual Host(虛擬主機)、 Exchange(交換機)、Queue(隊列)定義。

    (3)一個Broker可以創建多個Virtual Host(虛擬主機),Virtual Host的工作元素有Exchange和Queue。注意,如果AMQP是由多個Broker構成的集群提供服務,那么一個Virtual Host也可以由多個Broker共同構成。

    (4)Connection是由Producer(消息生產者)和Consumer(消息消費者)創建的連接,連接到Broker物理節點上。但是有了Connection后客戶端還不能和服務器通信,在Connection之上客戶端會創建Channel,連接到Virtual Host或者Queue上,這樣客戶端才能向Exchange發送消息或者從Queue接受消息。一個Connection上允許存在多個Channel,只有Channel中能夠發送/接受消息。

    (5)Exchange元素是AMQP協議中的交換機,Exchange可以綁定多個Queue也可以同時綁定其他Exchange。消息通過Exchange時,會按照Exchange中設置的Routing(路由)規則,將消息發送到符合的Queue或者Exchange中。

  2、AMQP消息在這個結構中是如何通過Producer發出,又經過Broker最后到達Consumer的呢

    

 

     (1)在Producer(消息生產者)客戶端建立了Channel后,就建立了到Broker上Virtual Host的連接。接下來Producer就可以向這個Virtual Host中的Exchange發送消息了。

     (2)Exchange(交換機)能夠處理消息的前提是:它至少已經和某個Queue或者另外的Exchange形成了綁定關系,並設置好了到這些Queue和Excahnge的Routing(路由規則)。Excahnge中的Routing有三種模式。在Exchange收到消息后,會根據設置的Routing(路由規則),將消息發送到符合要求的Queue或者Exchange中(路由規則還會和Message中的Routing Key屬性配合使用)。

    (3)Queue收到消息后,可能會進行如下的處理:如果當前沒有Consumer的Channel連接到這個Queue,那么Queue將會把這條消息進行存儲直到有Channel被創建(AMQP協議的不同實現產品中,存儲方式又不盡相同);如果已經有Channel連接到這個Queue,那么消息將會按順序被發送給這個Channel。

    (4)Consumer收到消息后,就可以進行消息的處理了。但是整個消息傳遞的過程還沒有完成:視設置情況,Consumer在完成某一條消息的處理后,將需要手動的發送一條ACK消息給對應的Queue(當然您可以設置為自動發送,或者無需發送)。Queue在收到這條ACK信息后,才會認為這條消息處理成功,並將這條消息從Queue中移除;如果在對應的Channel斷開后,Queue都沒有這條消息的ACK信息,這條消息將會重新被發送給另外的Channel。當然,您還可以發送NACK信息,這樣這條消息將會立即歸隊,並發送給另外的Channel。

  3、Excahnge中Routing的三種模式    

    Exchange交換機在AMQP協議中主要負責按照一定的規則,將收到的消息轉發到已經和它事先綁定好的Queue或者另外的Exchange中。Excahnge交換機的這個處理過程稱之為Routing(路由)。目前流行的AMQP路由實現一共有三種:分別是Direct Exchange、Fanout Exchange和Topic Exchange。需要特別注意的是:Exhange需要具備怎樣的‘路由’規則,並沒有在AMQP標准協議進行強行規定,目前流行的AMQP轉發規則都是AMQP實現產品自行開發的(這也是為什么AMQP消息中和路由、過濾規則相關的屬性是存放在application-properties區域的原因)。

    (1)Direct路由

      direct模式從字面上的理解應該是‘引導’、‘直接’的含義。direct模式下Exchange將使用AMQP消息中所攜帶的Routing-Key和Queue中的Routing Key進行比較。如果兩者完全匹配,就會將這條消息發送到這個Queue中。如下圖所示:

 

    (2)Fanout路由

      Fanout路由模式不需要Routing Key。當設置為Fanout模式的Exchange收到AMQP消息后,將會將這個AMQP消息復制多份,分別發送到和自己綁定的各個Queue中。如下圖所示:

      (3)Topic路由

        Topic模式是Routing Key的匹配模式。Exchange將支持使用‘#’和‘ * ’通配符進行Routing Key的匹配查找(‘#’表示0個或若干個關鍵詞,‘ * ’表示一個關鍵詞,注意是關鍵詞不是字母)。如下圖所示:

    為了方便各位讀者的理解,這里我們再舉幾個通配符匹配的示例:

    (1)“param.#”,可以匹配“param”、“param.test”、“param.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能匹配諸如“param1.test”、“param2.test”以為param這個關鍵詞和param1這個關鍵詞不相同。

    (2)“param.*.* ”,可以匹配“param.test.test”、“param.test.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能匹配諸如“param”、“param.test”、“parm.child”等等Routing Key。

    (3)“param.*.value”,可以匹配“param.value.value”、“param.test.value”等Routing Key;但是不能匹配諸如“param.value”、“param.value.child”等Routing Key。

    注意,以上介紹的Direct 路由模式和Topic 路由模式中,如果Exchange交換機沒有找到任何匹配Routing Key的Queue,那么這條AMQP消息會被丟棄。(只有Queue有保存消息的功能,但是Exchange並不負責保存消息)

 

RabbitMQ框架構建系列目錄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM