RabbitMQ基礎概念詳細介紹


http://www.diggerplus.org/archives/3110

 

本文對rabbitmq基礎介紹,完全是為了下一篇rabbitmq性能測試做准備,讓讀者去了解我們需要測試的是什么樣一個“東西”。

 

引言

你是否遇到過兩個(多個)系統間需要通過定時任務來同步某些數據?你是否在為異構系統的不同進程間相互調用、通訊的問題而苦惱、掙扎?如果是,那么恭喜你,消息服務讓你可以很輕松地解決這些問題。
消息服務擅長於解決多系統、異構系統間的數據交換(消息通知/通訊)問題,你也可以把它用於系統間服務的相互調用(RPC)。本文將要介紹的RabbitMQ就是當前最主流的消息中間件之一。

RabbitMQ簡介

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
下面將重點介紹RabbitMQ中的一些基礎概念,了解了這些概念,是使用好RabbitMQ的基礎。

ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket鏈接,它封裝了socket協議相關部分邏輯。ConnectionFactory為Connection的制造工廠。
Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。

Queue

Queue(隊列)是RabbitMQ的內部對象,用於存儲消息,用下圖表示。
RabbitMQ基礎概念詳細介紹

RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息並最終投遞到Queue中,消費者(下圖中的C)可以從Queue中獲取消息並消費。

RabbitMQ基礎概念詳細介紹

多個消費者可以訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息並處理。
RabbitMQ基礎概念詳細介紹

Message acknowledgment

在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致消息丟失。為了避免這種情況發生,我們可以要求消費者在消費完消息后發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)后才將該消息從Queue中移除;如果RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ連接斷開,則RabbitMQ會將該消息發送給其他消費者(如果存在多個消費者)進行處理。這里不存在timeout概念,一個消費者處理消息時間再長也不會導致該消息被發送給其他消費者,除非它的RabbitMQ連接斷開。
這里會產生另外一個問題,如果我們的開發人員在處理完業務邏輯后,忘記發送回執給RabbitMQ,這將會導致嚴重的bug——Queue中堆積的消息會越來越多;消費者重啟后會重復消費這些消息並重復執行業務邏輯…

Message durability

如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失消息,我們可以將Queue與Message都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),如果我們需要對這種小概率事件也要管理起來,那么我們要用到事務。由於這里僅為RabbitMQ的簡單介紹,所以這里將不講解RabbitMQ相關的事務。

Prefetch count

前面我們講到如果有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者。這時如果每個消息的處理時間不同,就有可能會導致某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作並一直空閑的情況。我們可以通過設置prefetchCount來限制Queue每次發送給每個消費者的消息數,比如我們設置prefetchCount=1,則Queue每次給每個消費者發送一條消息;消費者處理完這條消息后Queue會再給該消費者發送一條消息。

RabbitMQ基礎概念詳細介紹

Exchange

在上一節我們看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。

RabbitMQ基礎概念詳細介紹

Exchange是按照什么邏輯將消息路由到Queue的?這個將在Binding一節介紹。
RabbitMQ中的Exchange有四種類型,不同的類型有着不同的路由策略,這將在Exchange Types一節介紹。

routing key

生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。
在Exchange Type與binding key固定的情況下(在正常使用時一般這些內容都是固定配置好的),我們的生產者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪里。
RabbitMQ為routing key設定的長度限制為255 bytes。

Binding

RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
RabbitMQ基礎概念詳細介紹

Binding key

在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key;消費者將消息發送給Exchange時,一般會指定一個routing key;當binding key與routing key相匹配時,消息將會被路由到對應的Queue中。這個將在Exchange Types章節會列舉實際的例子加以說明。
在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。
binding key 並不是在所有情況下都生效,它依賴於Exchange Type,比如fanout類型的Exchange就會無視binding key,而是將消息路由到所有綁定到該Exchange的Queue。

Exchange Types

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規范里還提到兩種Exchange Type,分別為system與自定義,這里不予以描述),下面分別進行介紹。

fanout

fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。
RabbitMQ基礎概念詳細介紹

上圖中,生產者(P)發送到Exchange(X)的所有消息都會路由到圖中的兩個Queue,並最終被兩個消費者(C1與C2)消費。

direct

direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全匹配的Queue中。
RabbitMQ基礎概念詳細介紹

以上圖的配置為例,我們以routingKey=”error”發送消息到Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…);如果我們以routingKey=”info”或routingKey=”warning”來發送消息,則消息只會路由到Queue2。如果我們以其他routingKey發送消息,則消息不會路由到這兩個Queue中。

topic

前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同,它約定:

  • routing key為一個句點號“. ”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key與routing key一樣也是句點號“. ”分隔的字符串
  • binding key中可以存在兩種特殊字符“*”與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)

RabbitMQ基礎概念詳細介紹

以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。

headers

headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange綁定時指定的鍵值對;如果完全匹配則消息會路由到該Queue,否則不會路由到該Queue。
該類型的Exchange沒有用到過(不過也應該很有用武之地),所以不做介紹。

RPC

MQ本身是基於異步的消息處理,前面的示例中所有的生產者(P)將消息發送到RabbitMQ后不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。
但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的消息處理完成后再進行下一步處理。這相當於RPC(Remote Procedure Call,遠程過程調用)。在RabbitMQ中也支持RPC。
RabbitMQ基礎概念詳細介紹
RabbitMQ中實現RPC的機制是:

  • 客戶端發送請求(消息)時,在消息的屬性(MessageProperties,在AMQP協議中定義了14中properties,這些屬性會隨着消息一起發送)中設置兩個值replyTo(一個Queue名稱,用於告訴服務器處理完成后將通知我的消息發送到這個Queue中)和correlationId(此次請求的標識號,服務器處理完成后需要將此屬性返還,客戶端將根據這個id了解哪條請求被成功執行了或執行失敗)
  • 服務器端收到消息並處理
  • 服務器端處理完消息后,將生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性
  • 客戶端之前已訂閱replyTo指定的Queue,從中收到服務器的應答消息后,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行后續業務處理

總結

本文介紹了RabbitMQ中個人認為最重要的概念,充分利用RabbitMQ提供的這些功能就可以處理我們絕大部分的異步業務了。
本篇的基本概念可能很難理解並消化,結合實際的應用代碼應該會比較容易吸收。所以接下來要寫的文章例中會包含實際的業務應用場景分析,為什么使用RabbitMQ來實現,如何用RabbitMQ來實現。

 

 

http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/tutorials_with_python/[6]RPC/#_5 

 

 

 

http://blog.csdn.net/anzhsoft/article/details/19563091

 

一個客戶端可以和多個exchange相連

consumer指定的時候只要指定QueueName就OK了    

 

這個系統架構圖版權屬於sunjun041640。

    RabbitMQ Server: 也叫broker server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是維護一條從Producer到Consumer的路線,保證數據能夠按照指定的方式進行傳輸。但是這個保證也不是100%的保證,但是對於普通的應用來說這已經足夠了。當然對於商業系統來說,可以再做一層數據一致性的guard,就可以徹底保證系統的一致性了。

    Client A & B: 也叫Producer,數據的發送方。createmessages and publish (send) them to a broker server (RabbitMQ).一個Message有兩個部分:payload(有效載荷)和label(標簽)。payload顧名思義就是傳輸的數據。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。

    Client 1,2,3:也叫Consumer,數據的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱后,RabbitMQ把它發送給它的某個訂閱者即Consumer。當然可能會把同一個Message發送給很多的Consumer。在這個Message中,只有payload,label已經被刪掉了。對於Consumer來說,它是不知道誰發送的這個信息的。就是協議本身不支持。但是當然了如果Producer發送的payload包含了Producer的信息就另當別論了。

     對於一個數據從Producer到Consumer的正確傳遞,還有三個概念需要明確:exchanges, queues and bindings。

        Exchanges are where producers publish their messages.

        Queuesare where the messages end up and are received by consumers

        Bindings are how the messages get routed from the exchange to particular queues.

   還有幾個概念是上述圖中沒有標明的,那就是Connection(連接),Channel(通道,頻道)。

 

   Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個TCP連接。

   Channels: 虛擬連接。它建立在上述的TCP連接中。數據流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。

    那么,為什么使用Channel,而不是直接使用TCP連接?

    對於OS來說,建立和關閉TCP連接是有代價的,頻繁的建立關閉TCP連接對於系統的性能有很大的影響,而且TCP的連接數也有限制,這也限制了系統處理高並發的能力。但是,在TCP連接中建立Channel是沒有上述代價的。對於Producer或者Consumer來說,可以並發的使用多個Channel進行Publish或者Receive。有實驗表明,1s的數據可以Publish10K的數據包。當然對於不同的硬件環境,不同的數據包大小這個數據肯定不一樣,但是我只想說明,對於普通的Consumer或者Producer來說,這已經足夠了。如果不夠用,你考慮的應該是如何細化split你的設計。

 

 

對於API的使用可以參考官方的java文檔,(里面的思想無論是java、cpp還是python都相似)

https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.1/rabbitmq-java-client-javadoc-3.1.1/com/rabbitmq/client/Channel.html#basicConsume(java.lang.String, boolean, java.lang.String, com.rabbitmq.client.Consumer)

 

 

http://m.oschina.net/blog/186202

從AMQP協議可以看出,MessageQueue、Exchange和Binding構成了AMQP協議的核心,下面我們就圍繞這三個主要組件    從應用使用的角度全面的介紹如何利用Rabbit MQ構建消息隊列以及使用過程中的注意事項。

RabbitMQ系列二(構建消息隊列) - 網易杭研后台技術中心 - 網易杭研后台技術中心的博客

 

 

  • 1. 聲明MessageQueue

      在Rabbit MQ中,無論是生產者發送消息還是消費者接受消息,都首先需要聲明一個MessageQueue。這就存在一個問題,是生產者聲明還是消費者聲明呢?要解決這個問題,首先需要明確:

a)消費者是無法訂閱或者獲取不存在的MessageQueue中信息。

b)消息被Exchange接受以后,如果沒有匹配的Queue,則會被丟棄。

在明白了上述兩點以后,就容易理解如果是消費者去聲明Queue,就有可能會出現在聲明Queue之前,生產者已發送的消息被丟棄的隱患。如果應用能夠通過消息重發的機制允許消息丟失,則使用此方案沒有任何問題。但是如果不能接受該方案,這就需要無論是生產者還是消費者,在發送或者接受消息前,都需要去嘗試建立消息隊列。這里有一點需要明確,如果客戶端嘗試建立一個已經存在的消息隊列,Rabbit MQ不會做任何事情,並返回客戶端建立成功的。

       如果一個消費者在一個信道中正在監聽某一個隊列的消息,Rabbit MQ是不允許該消費者在同一個channel去聲明其他隊列的。Rabbit MQ中,可以通過queue.declare命令聲明一個隊列,可以設置該隊列以下屬性:

a) Exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:其一,排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的。其二,“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。其三,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。

b)   Auto-delete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。

 c)   Durable:持久化,這個會在后面作為專門一個章節討論。

d)  其他選項,例如如果用戶僅僅想查詢某一個隊列是否已存在,如果不存在,不想建立該隊列,仍然可以調用queue.declare,只不過需要將參數passive設為true,傳給queue.declare,如果該隊列已存在,則會返回true;如果不存在,則會返回Error,但是不會創建新的隊列。

  • 2. 生產者發送消息

        在AMQP模型中,Exchange是接受生產者消息並將消息路由到消息隊列的關鍵組件。ExchangeType和Binding決定了消息的路由規則。所以生產者想要發送消息,首先必須要聲明一個Exchange和該Exchange對應的Binding。可以通過 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,聲明一個Exchange需要三個參數:ExchangeName,ExchangeType和Durable。ExchangeName是該Exchange的名字,該屬性在創建Binding和生產者通過publish推送消息時需要指定。ExchangeType,指Exchange的類型,在RabbitMQ中,有三種類型的Exchange:direct ,fanout和topic,不同的Exchange會表現出不同路由行為。Durable是該Exchange的持久化屬性,這個會在消息持久化章節討論。聲明一個Binding需要提供一個QueueName,ExchangeName和BindingKey。下面我們就分析一下不同的ExchangeType表現出的不同路由規則。

        生產者在發送消息時,都需要指定一個RoutingKey和Exchange,Exchange在接到該RoutingKey以后,會判斷該ExchangeType:

                         a) 如果是Direct類型,則會將消息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,則發送到該Binding對應的Queue中。

RabbitMQ系列二(構建消息隊列) - 網易杭研后台技術中心 - 網易杭研后台技術中心的博客 

                  b)   如果是  Fanout  類型,則會將消息發送給所有與該  Exchange  定義過  Binding  的所有  Queues  中去,其實是一種廣播行為。
           

RabbitMQ系列二(構建消息隊列) - 網易杭研后台技術中心 - 網易杭研后台技術中心的博客 

        c)如果是Topic類型,則會按照正則表達式,對RoutingKey與BindingKey進行匹配,如果匹配成功,則發送到對應的Queue中。

             RabbitMQ系列二(構建消息隊列) - 網易杭研后台技術中心 - 網易杭研后台技術中心的博客

  • 3. 消費者訂閱消息    

    在RabbitMQ中消費者有2種方式獲取隊列中的消息:

       a)  一種是通過basic.consume命令,訂閱某一個隊列中的消息,channel會自動在處理完上一條消息之后,接收下一條消息。(同一個channel消息處理是串行的)。除非關閉channel或者取消訂閱,否則客戶端將會一直接收隊列的消息。

             b)  另外一種方式是通過basic.get命令主動獲取隊列中的消息,但是絕對不可以通過循環調用basic.get來代替basic.consume,這是因為basic.get RabbitMQ在實際執行的時候,是首先consume某一個隊列,然后檢索第一條消息,然后再取消訂閱。如果是高吞吐率的消費者,最好還是建議使用basic.consume。

      如果有多個消費者同時訂閱同一個隊列的話,RabbitMQ是采用循環的方式分發消息的,每一條消息只能被一個訂閱者接收。例如,有隊列Queue,其中ClientA和ClientB都Consume了該隊列,MessageA到達隊列后,被分派到ClientA,ClientA回復服務器收到響應,服務器刪除MessageA;再有一條消息MessageB抵達隊列,服務器根據“循環推送”原則,將消息會發給ClientB,然后收到ClientB的確認后,刪除MessageB;等到再下一條消息時,服務器會再將消息發送給ClientA。

       這里我們可以看出,消費者再接到消息以后,都需要給服務器發送一條確認命令,這個即可以在handleDelivery里顯示的調用basic.ack實現,也可以在Consume某個隊列的時候,設置autoACK屬性為true實現。這個ACK僅僅是通知服務器可以安全的刪除該消息,而不是通知生產者,與RPC不同。 如果消費者在接到消息以后還沒來得及返回ACK就斷開了連接,消息服務器會重傳該消息給下一個訂閱者,如果沒有訂閱者就會存儲該消息。

        既然RabbitMQ提供了ACK某一個消息的命令,當然也提供了Reject某一個消息的命令。當客戶端發生錯誤,調用basic.reject命令拒絕某一個消息時,可以設置一個requeue的屬性,如果為true,則消息服務器會重傳該消息給下一個訂閱者;如果為false,則會直接刪除該消息。當然,也可以通過ack,讓消息服務器直接刪除該消息並且不會重傳。

  • 4. 持久化:

        Rabbit MQ默認是不持久隊列、Exchange、Binding以及隊列中的消息的,這意味着一旦消息服務器重啟,所有已聲明的隊列,Exchange,Binding以及隊列中的消息都會丟失。通過設置Exchange和MessageQueue的durable屬性為true,可以使得隊列和Exchange持久化,但是這還不能使得隊列中的消息持久化,這需要生產者在發送消息的時候,將delivery mode設置為2,只有這3個全部設置完成后,才能保證服務器重啟不會對現有的隊列造成影響。這里需要注意的是,只有durable為true的Exchange和durable為ture的Queues才能綁定,否則在綁定時,RabbitMQ都會拋錯的。持久化會對RabbitMQ的性能造成比較大的影響,可能會下降10倍不止。

  • 5. 事務:

     對事務的支持是AMQP協議的一個重要特性。假設當生產者將一個持久化消息發送給服務器時,因為consume命令本身沒有任何Response返回,所以即使服務器崩潰,沒有持久化該消息,生產者也無法獲知該消息已經丟失。如果此時使用事務,即通過txSelect()開啟一個事務,然后發送消息給服務器,然后通過txCommit()提交該事務,即可以保證,如果txCommit()提交了,則該消息一定會持久化,如果txCommit()還未提交即服務器崩潰,則該消息不會服務器就收。當然Rabbit MQ也提供了txRollback()命令用於回滾某一個事務。

  • 6. Confirm機制:

      使用事務固然可以保證只有提交的事務,才會被服務器執行。但是這樣同時也將客戶端與消息服務器同步起來,這背離了消息隊列解耦的本質。Rabbit MQ提供了一個更加輕量級的機制來保證生產者可以感知服務器消息是否已被路由到正確的隊列中——Confirm。如果設置channel為confirm狀態,則通過該channel發送的消息都會被分配一個唯一的ID,然后一旦該消息被正確的路由到匹配的隊列中后,服務器會返回給生產者一個Confirm,該Confirm包含該消息的ID,這樣生產者就會知道該消息已被正確分發。對於持久化消息,只有該消息被持久化后,才會返回Confirm。Confirm機制的最大優點在於異步,生產者在發送消息以后,即可繼續執行其他任務。而服務器返回Confirm后,會觸發生產者的回調函數,生產者在回調函數中處理Confirm信息。如果消息服務器發生異常,導致該消息丟失,會返回給生產者一個nack,表示消息已經丟失,這樣生產者就可以通過重發消息,保證消息不丟失。Confirm機制在性能上要比事務優越很多。但是Confirm機制,無法進行回滾,就是一旦服務器崩潰,生產者無法得到Confirm信息,生產者其實本身也不知道該消息吃否已經被持久化,只有繼續重發來保證消息不丟失,但是如果原先已經持久化的消息,並不會被回滾,這樣隊列中就會存在兩條相同的消息,系統需要支持去重。

 

  • 其他:

Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

消息隊列的使用過程大概如下:

(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。

 

 

 

Linux下安裝RabbitMQ

標簽: rabbitmq消息隊列

1. 安裝erlang:

下載所需的源碼:wget http://erlang.org/download/otp_src_R13B04.tar.gz wget http://erlang.org/download/otp_src_R13B04.tar.gz

然后./configure && make && make install

 

注:在configure之后發現有以下提示信息:

(1) odbc : ODBC library – link check failed

需要yum -y install unixODBC unixODBC-devel

(2)“ wxWidgets not found, wx will NOT be usable”及“fop is missing.”這兩個可以忽略。

 

 

安裝完畢,在命令行鍵入erl,將會出現如下命令行:

Erlang R13B04 (erts-5.7.5) [source] [smp:2:2] [rq:2][async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.5 

(abort with ^G)
1>

 

2.安裝RabbitMQ

RabbitMQ的安裝有很多版本,我們使用Generic Unix版本。

http://www.rabbitmq.com/install-generic-unix.html中下載http://www.rabbitmq.com/releases/rabbitmq-server/v3.4.3/rabbitmq-server-generic-unix-3.4.3.tar.gz並解壓到Linux中的任意目錄(/mysoftware/rabbitmq_server-3.4.3/),解壓后即可使用。

 

3.RabbitMQ開啟與關閉

進入rabbitmq安裝目錄下的sbin目錄,分別執行以下命令:

  1. 開啟RabbitMQ服務:.  
  2. ./rabbitmq-server -detached (-detached為可選參數,表示后台開啟)  
  3.   
  4. 開啟RabbitMQ管理工具,通過瀏覽器訪問http://localhost:15672使用:.  
  5. ./rabbitmq-plugins enable rabbitmq_management (3.3.1版本以后默認不允許用guest遠程管理。可以手動創建管理員帳號來遠程管理,具體參見4.4)  
  6.   
  7. 關閉RabbitMQ服務:  
  8. ./rabbitmqctl stop  
開啟RabbitMQ服務:.
./rabbitmq-server -detached (-detached為可選參數,表示后台開啟)

開啟RabbitMQ管理工具,通過瀏覽器訪問http://localhost:15672使用:.
./rabbitmq-plugins enable rabbitmq_management (3.3.1版本以后默認不允許用guest遠程管理。可以手動創建管理員帳號來遠程管理,具體參見4.4)

關閉RabbitMQ服務:
./rabbitmqctl stop
 
 
注意:如果是登錄遠程管理界面(比如物理機訪問虛擬機的管理界面時),除了關閉防火牆開通相應允許的端口以外,也不能用guest/guest訪問,而應該新創建用戶並賦予管理員權限才可訪問
增加用戶:rabbitmqctl add_user ddd ddd
設置管理員權限: rabbitmqctl set_user_tags diego administrator 這樣才可以訪問web頁面提供的管理頁面
./rabbitmqctl  set_permissions -p / diego '.*' '.*' '.*'  給用戶diego加上有訪問虛擬空間/的權限
 

4.設置RabbitMQ管理權限

 

Rabbitmq服務器的主要通過rabbitmqctl和rabbimq-plugins兩個工具來管理,以下是一些常用功能。

1). 服務器啟動與關閉

      啟動: rabbitmq-server –detached

      關閉:rabbitmqctl stop

      若單機有多個實例,則在rabbitmqctlh后加–n 指定名稱

2). 插件管理

      開啟某個插件:rabbitmq-pluginsenable xxx

      關閉某個插件:rabbitmq-pluginsdisablexxx

      注意:重啟服務器后生效。

3).virtual_host管理

      新建virtual_host: rabbitmqctladd_vhost  xxx

      撤銷virtual_host:rabbitmqctl  delete_vhost xxx

4). 用戶管理

      新建用戶:rabbitmqctl add_user xxx pwd

      刪除用戶:   rabbitmqctl delete_user xxx

      改密碼: rabbimqctl change_password {username} {newpassword}

      設置用戶角色:rabbitmqctl set_user_tags {username} {tag ...}

              Tag可以為 administrator,monitoring, management

5). 權限管理

      權限設置:set_permissions [-pvhostpath] {user} {conf} {write} {read}

               Vhostpath

               Vhost路徑

               user

     用戶名

              Conf

      一個正則表達式match哪些配置資源能夠被該用戶訪問。

              Write

      一個正則表達式match哪些配置資源能夠被該用戶讀。

               Read

      一個正則表達式match哪些配置資源能夠被該用戶訪問。

6). 獲取服務器狀態信息

       服務器狀態:rabbitmqctl status

       隊列信息:rabbitmqctl list_queues[-p vhostpath] [queueinfoitem ...]

                Queueinfoitem可以為:name,durable,auto_delete,arguments,messages_ready,

                messages_unacknowledged,messages,consumers,memory

       Exchange信息:rabbitmqctllist_exchanges[-p vhostpath] [exchangeinfoitem ...]

                 Exchangeinfoitem有:name,type,durable,auto_delete,internal,arguments.

       Binding信息:rabbitmqctllist_bindings[-p vhostpath] [bindinginfoitem ...]       

                 Bindinginfoitem有:source_name,source_kind,destination_name,destination_kind,routing_key,arguments

       Connection信息:rabbitmqctllist_connections [connectioninfoitem ...]

       Connectioninfoitem有:recv_oct,recv_cnt,send_oct,send_cnt,send_pend等。

       Channel信息:rabbitmqctl  list_channels[channelinfoitem ...]

      Channelinfoitem有consumer_count,messages_unacknowledged,messages_uncommitted,acks_uncommitted,messages_unconfirmed,prefetch_count,client_flow_blocked

 

 

 

rabbitmq 實現原理

AMQP(高級消息隊列協議 Advanced Message Queue Protocol)

AMQP當中有四個概念非常重要: 虛擬主機(virtual host),交換機(exchange),隊列(queue)和綁定(binding)。一個虛擬主機持有一組交換機、隊列和綁定。為什么需要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創 建一個虛擬主機。每一個RabbitMQ服務器都有一個默認的虛擬主機“/”。

Producer 要產生消息必須要創建一個 Exchange ,Exchange 用於轉發消息,但是它不會做存儲,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息,當然如果消息總是發送過去就被直接丟棄那就沒有什么意思了,一個 Consumer 想要接受消息的話,就要創建一個 Queue ,並把這個 Queue bind 到指定的 Exchange 上,然后 Exchange 會把消息轉發到 Queue 那里,Queue 會負責存儲消息,Consumer 可以通過主動 Pop 或者是 Subscribe 之后被動回調的方式來從 Queue 中取得消息。

 

Exchange,Queue,RoutingKey

 

藍色-- Client(相對於Rabbitmq Server來說)

綠色--Exchange

紅色—Queue

     - 交換器(Exchange),它是發送消息的實體。

     - 隊列(Queue),這是接收消息的實體。

     - 綁定器(Bind),將交換器和隊列連接起來,並且封裝消息的路由信息。

 

Exchange指向Queue的黑色線—RoutingKey,可以將它簡單的理解為一條連接Exchange和Queue的路線

 

Exchange和Queue都需要通過channel來進行定義,而RoutingKey則只需要在binding時取個名字就行了。

 

這一塊的理解是不正確的,

Exchange Queue RoutingKey關系說明:

 

Exchange Name

Queue Name

Routing Key

 

test.queue     

test.queue

 

test.queue2    

test.queue2

test.exchange  

test.queue     

test.routingkey

test.exchange  

test.queue2    

test.routingkey

test.exchange  

test.queue     

test.routingkey2

test.exchange1

test.queue     

test.routingkey

 

由結果可以看出,由Exchange,Queue,RoutingKey三個才能決定一個從Exchange到Queue的唯一的線路。

 

 

 

 

左邊的Client向右邊的Client發送消息,流程:

 

1,  獲取Conection

 

2,  獲取Channel

 

3,  定義Exchange,Queue

 

4,  使用一個RoutingKey將Queue Binding到一個Exchange上

 

5,  通過指定一個Exchange和一個RoutingKey來將消息發送到對應的Queue上,

 

6,  接收方在接收時也是獲取connection,接着獲取channel,然后指定一個Queue直接到它關心的Queue上取消息,它對Exchange,RoutingKey及如何binding都不關心,到對應的Queue上去取消息就OK了(這個是重點,接收方,只關心queue的名字即可,其他的都不關心!!!!!)

 

換句話說:

發送放關心:ExchangName, QueueName,BindingKey

接收方關心:QuueuName

 

一個Client發送消息,哪些Client可以收到消息,其核心就在於Exchange,RoutingKey,Queue的關系上。

 

 

Exchange

RoutingKey

Queue

1

E1

R1

Q1

2

 

R2

Q2

3

E2

R3

Q1

4

 

R4

Q2

5

E1

R5

Q1

6

E2

R6

Q1

 

我們可以這樣理解,RoutingKey就像是個中間表,將兩個表的數據進行多對多關聯,只不過對於相同的Exchange和Queue,可以使用不同的RoutingKey重復關聯多次。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM