RabbitMQ與java、Spring結合實例詳細講解【專題】


RabbitMQ可以理解為軟件的路由器。




從AMQP協議可以看出,MessageQueue、Exchange和Binding構成了AMQP協議的核心。
根據綁定規則將隊列綁定到交換器上;
消息是發布到交換器上的;
有三種類型的交換器:direct,fanout和topic
基於消息的路由鍵和交換器類型,服務器決定將消息投遞到哪個隊列去。

下面我們就圍繞這三個主要組件,從應用使用的角度全面的介紹如何利用Rabbit MQ構建消息隊列以及使用過程中的注意事項。



一個queue是否可以與多個交換機進行綁定呢?????
可以的,直接在RoutingKey上配置就可以了。

消費者通過以下兩種方式從特定隊列中接收消息:
(1)通過AMQP的basic.consume命令訂閱。這樣做會將信道置為接收模式,直到取消對隊列的訂閱為止。
訂閱了消息后,消費者在消費(或者拒絕)最近接收的那條消息后,就能從隊列中(可用的)自動接收下一條消息。
如果消費者處理隊列消息,並且/或者需要在消息一到達隊列時就自動接收的話,你應該使用basic.consume。
(2)某些時候,你只想從隊列獲得單條消息而不是持續訂閱。
向隊列請求單條消息是通過AMQP的basic.get命令實現的。這樣做可以讓消費者接收隊列中的下一條消息。
如果要獲得更多消息的話,需要再次發送basic.get命令。
不應該將basic.get放在一個循環里來替代basic.consume。因為這樣做會影響Rabbit的性能。大致上講,basic.get命令會訂閱消息,然后取消訂閱。
消費者理應始終使用basic.consume來實現高吞吐量。

如果至少有一個消費者訂閱了隊列的話,消息會立即發送給這些訂閱的消費者。
但是如果消息到達了無人訂閱的隊列呢?
在這種情況下,消息會在隊列中等待。一旦有消費者訂閱到該隊列,那么隊列上的消息就會發送給消費者。
更有趣的問題是,當有多個消費者訂閱到同一隊列上時,消息是如何分發的。
當Rabbit隊列擁有多個消費者時,隊列收到的消息將以循環(round-robin)的方式發送給消費者。每條消息只會發送給一個訂閱者

消費者接收到的每一條消息都必須進行確認。消費者必須通過AMQP的basic.ack命令顯式地向RabbitMQ發送一個確認,或者在訂閱隊列的時候就將auto_ack參數設置為true。當設置了auto_ack時,一旦消費者接收消息,RabbitMQ會自動視其確認了消息。
需要強調的是:
消費者對消息的確認 【是否成功消費,可以刪除隊列中的消息】

告訴生產者消息已經被接收了【消息已經成功到達隊列,生產者的任務已經完成了】 這兩件事 毫不相關。
因此,消費者通過確認命令告訴RabbitMQ它已經正確地接收了消息,同時RabbitMQ才能安全地把消息從隊列中刪除。

如果消費者收到一條消息,然后確認之前從Rabbit斷開連接(或者從隊列上取消訂閱),RabbitMQ會認為這條消息沒有分發,然后重新分發給下一個訂閱的消費者。
如果你的應用程序崩潰了,這樣做可以確保消息會被發送給另一個消費者進行處理。
另一方面,如果應用程序有bug而忘記確認消息的話,Rabbit將不會給該消費者發送更多消息了。這是因為在上一條消息被確認之前,Rabbit會認為這個消費者並沒有准備好接收下一條消息。
這個特性是在削峰場景比較有用。 
因為,如果處理消息內容非常耗時或耗資源,則你的應用程序可以延遲確認該消息,直到消息處理完成。這樣可以防止Rabbit持續不斷的消息涌向你的應用而導致過載
Rabbit給消費者1發送的消息一直沒有確認,在不超時的情況下Rabbit是否會給另一個消費者2發送下一條消息呢?????
不會。只要一個消費者不確認,其它的消費者就都不會消費到消息,即便隊列積壓了。

在收到消息后,如果你想要明確拒絕而不是確認收到該消息的話,該如何呢?
舉例來說,假設在處理消息的時候你遇到了不可恢復的錯誤,但是由於硬件問題,只影響到當前的消費者(這就是一個很好的示例,直到消息處理完成之前,你絕不能進行確認)。
只要消息尚未確認,則你有以下兩個選擇:
(1)把消費者從RabbitMQ服務器斷開連接。這會導致RabbitMQ自動重新把消息入隊並發送給另一個消費者。這樣做的好處是所有的RabbitMQ版本都支持。
   缺點是,這樣 建立/斷開連接的方式會額外增加RabbitMQ的負擔(如果消費者在處理每條消息時都遇到錯誤的話,會導致潛在的重大負荷)
(2)如果你正使用RabbitMQ2.0.0或更新的版本,那就使用AMQP的basic.reject命令。顧名思義:basic.reject允許消費者拒絕RabbitMQ發送的消息。
如果把reject命令的requue參數設置成true的話,RabbitMQ會將消息重新發送給下一個訂閱的消費者。
如果把reject命令的requeue參數設置為false的話,RabbitMQ立即會把消息從隊列中移除,而不會把它發送給新的消費者。
你也可以通過對消息確認的方式來簡單地忽略該消息(這種忽略消息的方式的優勢在於所有版本的RabbitMQ都支持)。如果你檢測到一條格式錯誤的消息而任何一個消費者都無法處理的時候,這樣做就十分有用。

注意:
當丟棄一條消息時,為什么要使用basic.reject命令,並將requeue參數設置成false來替代確認消息呢?
在將來的RabbitMQ版本中會支持一個特殊的“死信”(dead letter)隊列,用來存放那些被拒絕而不重入隊列的消息。死信隊列讓你通過檢測拒絕/未送達的消息來發現問題。
如果應用程序想自動從死信隊列功能中獲益的話,需要使用reject命令並將requeue參數設置成false

還有一件更重要的事情:如何創建隊列。
消費者和生產者都能使用AMQP的queue.declare命令來創建隊列。
但是如果消費者在同一條信道上訂閱了另一個隊列的話,就無法再聲明隊列了。必須首先取消訂閱,將信道置為“傳輸”模式。
當創建隊列時,你常常想要指定隊列名稱。
消費者訂閱隊列時需要隊列名稱,並在創建綁定時也需要指定隊列名稱。
如果不指定隊列名稱的話,Rabbit會分配一個隨機名稱並在queue.declare命令的響應中返回(對於構建在AMQP上的RPC應用來說,使用臨時“匿名”隊列很有用)。
以下是隊列設置中另一些有用的參數:
exclusive【專用的、專有的、排他的】:如果設置為true的話,隊列將變成私有的,此時只有你的應用程序才能夠消費隊列消息。當你想要限制一個隊列只有一個消費者的時候很有幫助。
auto-delete:當最后一個消費者取消訂閱的時候,隊列就會自動移除【只有交換器】。如果你需要臨時隊列只為一個消費者服務的話,請結合使用auto-delete和exclusive。當消費者斷開連接時,隊列就被移除了。

如果嘗試聲明一個已經存在的隊列會發生什么呢?
只要聲明參數完全匹配現存的隊列的話,Rabbit就什么也不做,並成功返回,就好像這個隊列已經創建成功一樣(如果參數不匹配的話,隊列聲明嘗試失敗)。
如果只是想檢測隊列是否存在,則可以設置queue.declare的passive選項為true。在該設置下,如果隊列存在,那么queue.declare命令會成功返回;如果隊列不存在的話,queue.declare命令不會創建隊列會返回一個錯誤

當設計應用程序時,你最有可能會問自己,是該由生產者還是消費者來創建所需的隊列呢
看起來最自然的答案是由消費者來創建隊列。畢竟,消費者才需要訂閱隊列,而且總不能訂閱一個不存在的隊列,是吧?先別這么快下結論。你首先需要想清楚消息的生產者能否承擔得起丟失消息。
發送出去的消息如果路由到了不存在的隊列的話,Rabbit會忽略它們
因此,如果你不能承擔得起消息進入“黑洞”而丟失的話,你的生產者和消費者就都應該嘗試去創建隊列。
另一方面,如果你能承擔得起丟失消息,或者你實現一種方法來重新發布未處理的消息的話(我們會向你展現如何做到這一點),你可以只讓自己的消費者來聲明隊列。
隊列是AMQP消息通信的基礎模塊:
(1)為消息提供了存儲空間,消息在此等待消費
(2)對負載均衡來說,隊列是絕佳方案。只需附加一堆消費者,並讓RabbitMQ以循環的方式均勻地分配發來的消息。
(3)隊列是Rabbit中消息的最后終點(除非消息進入了“黑洞”)



消息如何到達隊列呢?
當你想要將消息投遞到隊列時,你通過把消息發送給交換器來完成。然后根據確定的規則,RabbitMQ將會決定消息該投遞到哪個隊列。這些規則被稱作路由鍵(routing key)。
隊列通過路由鍵綁定到交換器。當你把消息發送到代理服務器時,消息將擁有一個路由鍵----即便是空的-----RabbitMQ也會將其和綁定使用路由鍵進行匹配。如果相匹配的話,那么消息將會投遞到該隊列。
如果路由的消息不匹配任何綁定模式的話,消息將進入“黑洞”

使用交換器和綁定來完成不同使用場景之外,還有另外一個好處是:對於發送消息給服務器的發布者來說,它不需要關心服務器的另一端(整個消息處理環節中的隊列和消費者)的邏輯。
direct交換器:如果路由鍵匹配的話,消息就被投遞到對應的隊列。
服務器必須實現direct類型交換器,包含一個空白字符串的默認交換器。
當聲明一個隊列時,它會自動綁定到默認交換器,並以隊列名稱作為路由鍵。
這意味着你可以使用如下代碼發送消息到之前聲明的隊列去。前提是你已經獲得了信道實例:

$channel->basic_publish($msg,'','queue-name');

第一個參數是你想要發的消息內容;
第二個參數是一個空的字符串,指定了默認交換器
第三個參數就是路由鍵。
這個路由鍵就是之前用來聲明隊列的名稱。

當默認的direct交換器無法滿足應用程序的需求時,你可以聲明你自己的交換器。只需發送 exchange.declare命令並設置合適的參數就行了。

fanout【扇出;展開;分列(賬戶)】交接器 會將收到的消息廣播到綁定的隊列上。
消息通信模式很簡單:當你發送一條消息到fanout交換器時,它會把消息投遞給所有附加在此交換器上的隊列 【一條消息會copy多份分發給不同的消費者】
這允許對單條消息做不同方式的反應。 類似於監聽者模式。

topic交換器:可以將來自不同源頭的消息能夠到達同一隊列。 隊列綁定到交換器上的時候可以使用通配符。 譬如“ *.msg-inbox ”,單個“.”把路由鍵分為了幾部分,“*”匹配特定位置的任意文本為了實現匹配所有規則,你可以使用“#”字符
*” 操作符會將 “.”視為分隔符;與之不同的是,“#”操作符沒有分塊的概念,它將任意 “.”字符均視為關鍵字的匹配部分。



 

 

  • 1. 聲明MessageQueue

      在Rabbit MQ中,無論是生產者發送消息還是消費者接受消息,都首先需要聲明一個MessageQueue。這就存在一個問題,是生產者聲明還是消費者聲明呢?要解決這個問題,首先需要明確:

a)消費者是無法訂閱或者獲取不存在的MessageQueue中信息。

b)消息被Exchange接受以后,如果沒有匹配的Queue,則會被丟棄。

在明白了上述兩點以后,就容易理解如果是消費者去聲明Queue,就有可能會出現在聲明Queue之前,生產者已發送的消息被丟棄的隱患。如果應用能夠通過消息重發的機制允許消息丟失,則使用此方案沒有任何問題。但是如果不能接受該方案,這就需要無論是生產者還是消費者,在發送或者接受消息前,都需要去嘗試建立消息隊列。這里有一點需要明確,如果客戶端嘗試建立一個已經存在的消息隊列,Rabbit MQ不會做任何事情,並返回客戶端建立成功的。

       如果一個消費者在一個信道中正在監聽某一個隊列的消息,Rabbit MQ是不允許該消費者在同一個channel去聲明其他隊列的。Rabbit MQ中,可以通過queue.declare命令聲明一個隊列,可以設置該隊列以下屬性:

a) Exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:其一,排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的。其二,“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。其三,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。

b)   Auto-delete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。

c)   Durable:持久化,這個會在后面作為專門一個章節討論。

d)  其他選項,例如如果用戶僅僅想查詢某一個隊列是否已存在,如果不存在,不想建立該隊列,仍然可以調用queue.declare,只不過需要將參數passive設為true,傳給queue.declare,如果該隊列已存在,則會返回true;如果不存在,則會返回Error,但是不會創建新的隊列。

  • 2. 生產者發送消息

        在AMQP模型中,Exchange是接受生產者消息並將消息路由到消息隊列的關鍵組件。ExchangeType和Binding決定了消息的路由規則。所以生產者想要發送消息,首先必須要聲明一個Exchange和該Exchange對應的Binding。可以通過 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,聲明一個Exchange需要三個參數:ExchangeName,ExchangeType和Durable。ExchangeName是該Exchange的名字,該屬性在創建Binding和生產者通過publish推送消息時需要指定。ExchangeType,指Exchange的類型,在RabbitMQ中,有三種類型的Exchange:direct ,fanout和topic,不同的Exchange會表現出不同路由行為。Durable是該Exchange的持久化屬性,這個會在消息持久化章節討論。聲明一個Binding需要提供一個QueueName,ExchangeName和BindingKey。下面我們就分析一下不同的ExchangeType表現出的不同路由規則。

生產者在發送消息時,都需要指定一個RoutingKey和Exchange,Exchange在接到該RoutingKey以后,會判斷該ExchangeType:

                  a) 如果是Direct類型,則會將消息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,則發送到該Binding對應的Queue中。

                  b)   如果是  Fanout  類型,則會將消息發送給所有與該  Exchange  定義過  Binding  的所有  Queues  中去,其實是一種廣播行為。

        c)如果是Topic類型,則會按照正則表達式,對RoutingKey與BindingKey進行匹配,如果匹配成功,則發送到對應的Queue中。

             

  • 3. 消費者訂閱消息    

    在RabbitMQ中消費者有2種方式獲取隊列中的消息:

       a)  一種是通過basic.consume命令,訂閱某一個隊列中的消息,channel會自動在處理完上一條消息之后,接收下一條消息。(同一個channel消息處理是串行的)。除非關閉channel或者取消訂閱,否則客戶端將會一直接收隊列的消息。

       b)  另外一種方式是通過basic.get命令主動獲取隊列中的消息,但是絕對不可以通過循環調用basic.get來代替basic.consume,這是因為basic.get RabbitMQ在實際執行的時候,是首先consume某一個隊列,然后檢索第一條消息,然后再取消訂閱。如果是高吞吐率的消費者,最好還是建議使用basic.consume。

      如果有多個消費者同時訂閱同一個隊列的話,RabbitMQ是采用循環的方式分發消息的,每一條消息只能被一個訂閱者接收。例如,有隊列Queue,其中ClientA和ClientB都Consume了該隊列,MessageA到達隊列后,被分派到ClientA,ClientA服務器收到響應,服務器刪除MessageA;再有一條消息MessageB抵達隊列,服務器根據“循環推送”原則,將消息會發給ClientB,然后收到ClientB的確認后,刪除MessageB;等到再下一條消息時,服務器會再將消息發送給ClientA。

       這里我們可以看出,消費者再接到消息以后,都需要給服務器發送一條確認命令,這個即可以在handleDelivery里顯示的調用basic.ack實現,也可以在Consume某個隊列的時候,設置autoACK屬性為true實現。這個ACK僅僅是通知服務器可以安全的刪除該消息,而不是通知生產者,與RPC不同。 如果消費者在接到消息以后還沒來得及返回ACK就斷開了連接,消息服務器會重傳該消息給下一個訂閱者,如果沒有訂閱者就會存儲該消息。

        既然RabbitMQ提供了ACK某一個消息的命令,當然也提供了Reject某一個消息的命令。當客戶端發生錯誤,調用basic.reject命令拒絕某一個消息時,可以設置一個requeue的屬性,如果為true,則消息服務器會重傳該消息給下一個訂閱者;如果為false,則會直接刪除該消息。當然,也可以通過ack,讓消息服務器直接刪除該消息並且不會重傳。

  • 4. 持久化:

        Rabbit MQ默認是不持久隊列、Exchange、Binding以及隊列中的消息的,這意味着一旦消息服務器重啟,所有已聲明的隊列,Exchange,Binding以及隊列中的消息都會丟失。通過設置Exchange和MessageQueue的durable屬性為true,可以使得隊列和Exchange持久化,但是這還不能使得隊列中的消息持久化,這需要生產者在發送消息的時候,將delivery mode設置為2,只有這3個全部設置完成后,才能保證服務器重啟不會對現有的隊列造成影響。這里需要注意的是,只有durable為true的Exchange和durable為ture的Queues才能綁定,否則在綁定時,RabbitMQ都會拋錯的。持久化會對RabbitMQ的性能造成比較大的影響,可能會下降10倍不止。

  • 5. 事務:

     對事務的支持是AMQP協議的一個重要特性。假設當生產者將一個持久化消息發送給服務器時,因為consume命令本身沒有任何Response返回,所以即使服務器崩潰,沒有持久化該消息,生產者也無法獲知該消息已經丟失。如果此時使用事務,即通過txSelect()開啟一個事務,然后發送消息給服務器,然后通過txCommit()提交該事務,即可以保證,如果txCommit()提交了,則該消息一定會持久化,如果txCommit()還未提交即服務器崩潰,則該消息不會服務器就收。當然Rabbit MQ也提供了txRollback()命令用於回滾某一個事務。

  • 6. Confirm機制:

      使用事務固然可以保證只有提交的事務,才會被服務器執行。但是這樣同時也將客戶端與消息服務器同步起來,這背離了消息隊列解耦的本質。Rabbit MQ提供了一個更加輕量級的機制來保證生產者可以感知服務器消息是否已被路由到正確的隊列中——Confirm。如果設置channel為confirm狀態,則通過該channel發送的消息都會被分配一個唯一的ID,然后一旦該消息被正確的路由到匹配的隊列中后,服務器會返回給生產者一個Confirm,該Confirm包含該消息的ID,這樣生產者就會知道該消息已被正確分發。對於持久化消息,只有該消息被持久化后,才會返回Confirm。Confirm機制的最大優點在於異步,生產者在發送消息以后,即可繼續執行其他任務。而服務器返回Confirm后,會觸發生產者的回調函數,生產者在回調函數中處理Confirm信息。如果消息服務器發生異常,導致該消息丟失,會返回給生產者一個nack,表示消息已經丟失,這樣生產者就可以通過重發消息,保證消息不丟失。Confirm機制在性能上要比事務優越很多。但是Confirm機制,無法進行回滾,就是一旦服務器崩潰,生產者無法得到Confirm信息,生產者其實本身也不知道該消息吃否已經被持久化,只有繼續重發來保證消息不丟失,但是如果原先已經持久化的消息,並不會被回滾,這樣隊列中就會存在兩條相同的消息,系統需要支持去重。

  • 其他:

Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

消息隊列的使用過程大概如下:

(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。

 

Exchanges, queues, and bindings

exchanges, queues, and bindings是三個基礎的概念, 他們的作用是:
exchanges are where producers publish their messages, 
queues are where the messages end up and are received by consumers,
and  bindings are how the messages get routed from the exchange to particular queues. 
  
下面我們用一副簡單的思維導圖把上面的概念組織起來:
 
 
上面還提到了一個vhost的概念,vhost是為了組織exchanges, queues, and bindings提出的概念,我們就從它開始講起:

VHost

   Vhosts也是AMQP的一個基礎概念,連接到RabbitMQ默認就有一個名為"/"的vhost可用,本地調試的時候可以直接使用這個默認的vhost.這個"/"的訪問可以使用guest用戶名(密碼guest)訪問.可以使用rabbitmqctl工具修改這個賬戶的權限和密碼,這在生產環境是必須要關注的. 出於安全和可移植性的考慮,一個vhost內的exchange不能綁定到其他的vhost.
 
可以按照業務功能組來規划vhost,在集群環境中只要在某個節點創建vhost就會在整個集群內的節點都創建該vhost.VHost和權限都不能通過AMQP協議創建,在RabbitMQ中都是使用rabbitmqctl進行創建,管理.
 
如何創建vhost   
vhost和permission(權限)信息是並不是通過AMQP創建而是通過rabbitmqctl工具來添加,管理的.


RabbitMQ 用戶權限 分兩部分:
(1)vhost: 數據隔離。 權限控制的基礎,rabbitMQ的用戶只能訪問分配給自己的vhost
(2)Policy:可以動態更新用戶對指定vhost下exchange和queue的訪問行為,而不用重啟應用。 一些參數也可以變相地用在權限控制方面的,譬如 max Length Bytes,如果設置為0, 對Exchange和queuey設置后,這個用戶就無法從這個queue中接收消息,即取消用戶對這個隊列的權限。
這個功能對操作人員要求比較高。

 

 https://blog.csdn.net/sinat_36553913/article/details/93537601?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-4&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-4



 
說完vhost我們就來看看重中之重的消息:Message

Message

消息由兩部分組成:  payload and  label. "payload"是實際要傳輸的數據,至於數據的格式RabbitMQ並不關心,"label"描述payload,包括exchange name 和可選的topic tag.消息一旦到了consumer那里就只有payload部分了,label部分並沒有帶過來.RabbitMQ並不告訴你消息是誰發出的.這好比你收到一封信但是信封上是空白的.當然想知道是誰發的還是有辦法的,在消息內容中包含發送者的信息就可以了.
消息的consumer和producer對應的概念是sending和receiving並不對應client和server.通過channel我們可以創建很多並行的傳輸 TCP鏈接不再成為瓶頸,我們可以把RabbitMQ當做應用程序級別的路由器.
 
 
Consumer消息的接收方式
Consumer有兩種方式接收消息:
通過 basic.consume 訂閱隊列.channel將進入接收模式直到你取消訂閱.訂閱模式下Consumer只要上一條消息處理完成(ACK或拒絕),就會主動接收新消息.如果消息到達queue就希望得到盡快處理,也應該使用basic.consume命令.
還有一種情況,我們不需要一直保持訂閱,只要使用basic.get命令主動獲取消息即可.當前消息處理完成之后,繼續獲取消息需要主動執行basic.get 不要"在循環中使用basic.ge"t當做另外一種形式的basic.consume,因為這種做法相比basic.consume有額外的成本:basic.get本質上就是先訂閱queue取回一條消息之后取消訂閱.Consumer吞吐量大的情況下通常都會使用basic.consume.
 
 
要是沒有Consumer怎么辦?
如果消息沒有Consumer就會老老實實呆在隊列里面.
 
多個Consumer訂閱同一個隊列
只要Consumer訂閱了queue,消息就會發送到該Consumer.我們的問題是這種情況下queue中的消息是如何分發的?
如果一個rabbit queue有多個consumer,具體到隊列中的某條消息只會發送到其中的一個Consumer.
 
消息確認
所有接收到的消息都要求發送響應消息(ACK).這里有兩種方式一種是Consumer使用basic.ack明確發送ACK,一種是訂閱queue的時候指定auto_ack為true,這樣消息一到Consumer那里RabbitMQ就會認為消息已經得到ACK.
要注意的是這里的響應和消息的發送者沒有絲毫關系,ACK只是Consumer向RabbitMQ確認消息已經正確的接收到消息,RabbitMQ可以安全移除該消息,僅此而已.
 
沒有正確響應怎么辦
如果Consumer接收了一個消息就還沒有發送ACK就與RabbitMQ斷開了,RabbitMQ會認為這條消息沒有投遞成功會重新投遞到別的Consumer.如果你的應用程序崩掉了,你可以設置備用程序來繼續完成消息的處理.
如果Consumer本身邏輯有問題沒有發送ACK的處理,RabbitMQ不會再向該Consumer發送消息.RabbitMQ會認為這個Consumer還沒有處理完上一條消息,沒有能力繼續接收新消息.我們可以善加利用這一機制,如果需要處理過程是相當復雜的,應用程序可以延遲發送ACK直到處理完成為止.這可以有效控制應用程序這邊的負載,不致於被大量消息沖擊.
 
 
拒絕消息
由於要拒絕消息,所以ACK響應消息還沒有發出,所以這里拒絕消息可以有兩種選擇:
    1.Consumer直接斷開RabbitMQ 這樣RabbitMQ將把這條消息重新排隊,交由其它Consumer處理.這個方法在RabbitMQ各版本都支持.這樣做的壞處就是連接斷開增加了RabbitMQ的額外負擔,特別是consumer出現異常每條消息都無法正常處理的時候.
   2. RabbitMQ 2.0.0可以使用 basic.reject 命令,收到該命令RabbitMQ會重新投遞到其它的Consumer.如果設置requeue為false,RabbitMQ會直接將消息從queue中移除.
   其實還有一種選擇就是直接忽略這條消息並發送ACK,當你明確直到這條消息是異常的不會有Consumer能處理,可以這樣做拋棄異常數據.為什么要發送basic.reject消息而不是ACK?RabbitMQ后面的版本可能會引入"dead letter"隊列,如果想利用dead letter做點文章就使用basic.reject並設置requeue為false.
 
消息持久化
    消息的持久化需要在消息投遞的時候 設置delivery mode值為2 .由於消息實際存儲於queue之中,"皮之不存毛將焉附"邏輯上,消息持久化同時要求exchange和queue也是持久化的.這是消息持久化必須滿足的三個條件. 
     持久化的代價就是性能損失,磁盤IO遠遠慢於RAM(使用SSD會顯著提高消息持久化的性能) , 持久化會大大降低RabbitMQ每秒可處理的消息.兩者的性能差距可能在10倍以上.
 
消息恢復
   consumer從durable queue中取回一條消息之后並發回了ACK消息,RabbitMQ就會將其標記,方便后續垃圾回收.如果一條持久化的消息沒有被consumer取走,RabbitMQ重啟之后會自動重建exchange和queue(以及bingding關系),消息通過持久化日志重建再次進入對應的queues,exchanges.
 
皮之不存,毛將焉附?緊接着我們看看消息實際存放的地方:Queue

Queue

Queues是Massage的落腳點和等待接收的地方,消息除非被扔進黑洞否則就會被安置在一個Queue里面.Queue很適合做負載均衡,RabbitMQ可以在若干consumer中間實現輪流調度(Round-Robin).
 
如何創建隊列
   consumer和producer都可以創建Queue,如果consumer來創建,避免consumer訂閱一個不存在的Queue的情況,但是這里要承擔一種風險:消息已經投遞但是consumer尚未創建隊列,那么消息就會被扔到黑洞,換句話說消息丟了;避免這種情況的好辦法就是producer和consumer都嘗試創建一下queue. 如果consumer在已經訂閱了另外一個Queue的情況下無法完成新Queue的創建,必須取消之前的訂閱將Channel置為傳輸模式("transmit")才能創建新的Channel.
   創建Queue的時候通常要指定名字,名字方便consumer訂閱.即使你不指定Rabbit會給它分配一個隨機的名字,這在使用臨時匿名隊列完成RPC-over-AMQP調用時會非常有用.
   創建Queue的時候還有兩個非常有用的選項:
   exclusive—When set to true, your queue becomes private and can only be consumed by your app. This is useful when you need to limit a queue to only one consumer.
   auto-delete—The queue is automatically deleted when the last consumer unsubscribes.
 
   如果要創建只有一個consumer使用的臨時queue可以組合使用auto-delete和 exclusive.consumer一旦斷開連接該隊列自動刪除.
   重復創建Queue會怎樣?如果Queue創建的選項完全一致的話,RabbitMQ直接返回成功,如果名稱相同但是創建選項不一致就會返回創建失敗.如果是想檢查Queue是否存在,可以設置queue.declare命令的passive 選項為true:如果隊列存在就會返回成功,如果隊列不存在會報錯且不會執行創建邏輯.
 
消息是如何從動態路由到不同的隊列的?這就看下面的內容了
 

bindings and exchanges

消息如何發送到隊列
 
     消息是如何發送到隊列的?這就要說到AMQP  bindings and exchanges. 投遞消息到queue都是經由exchange完成的,和生活中的郵件投遞一樣也需要遵循一定的規則,在RabbitMQ中規則是通過routing key把queue綁定到exchange上,這種綁定關系即binding.消息發送到RabbitMQ都會攜帶一個routing key(哪怕是空的key),RabbitMQ會根據bindings匹配routing key,如果匹配成功消息會轉發到指定Queue,如果沒有匹配到queue消息就會被扔到黑洞.
 
如何發送到多個隊列
 
  消息是分發到多個隊列的?AMQP協議里面定義了幾種不同類型的exchange:direct, fanout, topic, and headers. 每一種都實現了一種 routing 算法. header的路由消息並不依賴routing key而是去匹配AMQP消息的header部分,這和下面提到的direct exchange如出一轍,但是性能要差很多,在實際場景中幾乎不會被用到.
 
direct exchange  routing key完全匹配才轉發
fanout exchange 不理會routing key,消息直接廣播到所有綁定的queue 
topic exchange  對routing key模式匹配
 
 
exchange持久化
創建queue和exchange默認情況下都是沒有持久化的,節點重啟之后queue和exchange就會消失,這里需要特別指定queue和exchange的durable屬性.
 
Consumer是直接創建TCP鏈接到RabbitMQ嗎?下面就是答案:

Channel

無論是要發布消息還是要獲取消息 ,應用程序都需要通過TCP連接到RabbitMQ.應用程序連接並通過權限認證之后就要創建Channel來執行AMQP命令.Channel是建立在實際TCP連接之上通信管道,這里之所以引入channel的概念而不是直接通過TCP鏈接直接發送AMQP命令,是出於兩方面的考慮:建立上成百上千的TCP鏈接,一方面浪費了TCP鏈接,一方面很快會觸及系統瓶頸.引入了Channel之后多個進程與RabbitMQ的通信可以在一條TCP鏈接上完成.我們可以把TCP類比做光纜,那么Channel就像光纜中的一根根光纖.

參考資料

https://www.cnblogs.com/linkenpark/p/5393666.html


摘要:本文介紹了rabbitMq,提供了如何在Ubuntu下安裝RabbitMQ 服務的方法。最后以RabbitMQ與java、Spring結合的兩個實例來演示如何使用RabbitMQ。

本文工程免費下載

一、rabbitMQ簡介

1.1、rabbitMQ的優點(適用范圍)
1. 基於erlang語言開發具有高可用高並發的優點,適合集群服務器。
2. 健壯、穩定、易用、跨平台、支持多種語言、文檔齊全。
3. 有消息確認機制和持久化機制,可靠性高。
4. 開源
其他MQ的優勢:
1. Apache ActiveMQ曝光率最高,但是可能會丟消息。
2. ZeroMQ延遲很低、支持靈活拓撲,但是不支持消息持久化和崩潰恢復。

1.2、幾個概念說明
producer&Consumer
producer指的是消息生產者,consumer消息的消費者。
Queue
消息隊列,提供了FIFO的處理機制,具有緩存消息的能力。rabbitmq中,隊列消息可以設置為持久化,臨時或者自動刪除。
設置為持久化的隊列,queue中的消息會在server本地硬盤存儲一份,防止系統crash,數據丟失
設置為臨時隊列,queue中的數據在系統重啟之后就會丟失
設置為自動刪除的隊列,當不存在用戶連接到server,隊列中的數據會被自動刪除Exchange

Exchange類似於數據通信網絡中的交換機,提供消息路由策略。rabbitmq中,producer不是通過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行綁定,producer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由算法,將消息路由給指定的queue。和Queue一樣,Exchange也可設置為持久化,臨時或者自動刪除。
Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別:
Direct
直接交換器,工作方式類似於單播,Exchange會將消息發送完全匹配ROUTING_KEY的Queue
fanout
廣播是式交換器,不管消息的ROUTING_KEY設置為什么,Exchange都會將消息轉發給所有綁定的Queue。
topic
主題交換器,工作方式類似於組播,Exchange會將消息轉發和ROUTING_KEY匹配模式相同的所有隊列,比如,ROUTING_KEY為user.stock的Message會轉發給綁定匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的隊列。( * 表是匹配一個任意詞組,#表示匹配0個或多個詞組)
headers
消息體的header匹配(ignore)
Binding
所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定可以是多對多的關系。
virtual host
在rabbitmq server上可以創建多個虛擬的message broker,又叫做virtual hosts (vhosts)。每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost相當於物理的server,可以為不同app提供邊界隔離,使得應用安全的運行在不同的vhost實例上,相互之間不會干擾。producer和consumer連接rabbit server需要指定一個vhost。

1.3、消息隊列的使用過程
1. 客戶端連接到消息隊列服務器,打開一個channel。
2. 客戶端聲明一個exchange,並設置相關屬性。
3. 客戶端聲明一個queue,並設置相關屬性。
4. 客戶端使用routing key,在exchange和queue之間建立好綁定關系。
5. 客戶端投遞消息到exchange。
6. exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里

 

二、環境配置與安裝

1、Erlang環境安裝
RabbitMQ是基於Erlang的,所以首先必須配置Erlang環境。
從Erlang的官網 http://www.erlang.org/download.html 下載最新的erlang安裝包,我下載的版本是 otp_src_R14B03.tar.gz 。
然后:

$ tar xvzf otp_src_R14B03.tar.gz  
$ cd otp_src_R14B03  
$ ./configure  
編譯后的輸出

如下圖:

注:
可能會報錯 configure: error: No curses library functions found
configure: error: /bin/sh '/home/liyixiang/erlang/configure' failed for erts

 

 

原因是缺少ncurses包

解決:在ubuntu系統下
apt-cache search ncurses  
apt-get install libncurses5-dev  
然后重新執行
./configure  
提示沒有wxWidgets和fop、ssh、odbc、ssl,但是問題不大。繼續:
make  
然后:
sudo make install  

配置erlang環境變量 
修改/etc/profile文件,增加下面的環境變量:(vim profile i插入 編輯完畢ESC退出 wq!強制修改)

#set erlang environment  
export PATH=$PATH:/usr/erlang/bin:$PATH  
source profile使得文件生效  

 

下面是我的

2、RabbitMQ-Server安裝

安裝完Erlang,開始安裝RabbitMQ-Server。安裝方法有三種,這里筆者三者都試過了,就只有以下這個方法成功了。

直接使用:

apt-get  install rabbitmq-server  

安裝完成后會自動打開:

使用命令查看rabbitmq運行狀態:

rabbitmqctl status  

 

停止

rabbitmqctl stop  

開啟

rabbitmq-server start  

3、rabbitmq web管理頁面插件安裝

輸入以下命令

cd /usr/lib/rabbitmq/bin/  
rabbitmq-plugins enable rabbitmq_management

 

這里筆者一直安裝不成功。

 

如果安裝成功打開瀏覽器,輸入 http://[server-name]:15672/ 如 http://localhost:15672/ ,會要求輸入用戶名和密碼,用默認的guest/guest即可(guest/guest用戶只能從localhost地址登錄,如果要配置遠程登錄,必須另創建用戶)。
如果要從遠程登錄怎么做呢?處於安全考慮,guest這個默認的用戶只能通過http://localhost:15672來登錄,其他的IP無法直接用這個guest帳號。
這里我們可以通過配置文件來實現從遠程登錄管理界面,只要編輯/etc/rabbitmq/rabbitmq.config文件(沒有就新增),添加以下配置就可以了。

4、添加用戶

vim /etc/rabbitmq/rabbitmq.config  

然后添加

[   
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}   
]
注意上面有個點號

現在添加了一個新授權用戶asdf,可以遠程使用這個用戶名。記得要先用命令添加這個命令才行:
cd /usr/lib/rabbitmq/bin/  

 

#用戶名與密碼
sudo rabbitmqctl add_user asdf 123456  

 

用戶設置為administrator才能遠程訪問
sudo rabbitmqctl set_user_tags asdf administrator   
sudo rabbitmqctl set_permissions -p / asdf ".*" ".*" ".*"  
其實也可以通過管理平台頁面直接添加用戶和密碼等信息。如果還不能遠程訪問或遠程登錄檢查是不是5672, 15672端口沒有開放!!!!!!

5、開放端口

ufw allow 5672  

三、簡單Java實例

下面來演示一個使用java的簡單實例:
1、首先是消息生產者和提供者的基類
package com.lin;  
  
import java.io.IOException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
   
/** 
 *  
 * 功能概要: EndPoint類型的隊列 
 *  
 * @author linbingwen 
 * @since  2016年1月11日 
 */  
public abstract class EndPoint{  
       
    protected Channel channel;  
    protected Connection connection;  
    protected String endPointName;  
       
    public EndPoint(String endpointName) throws IOException{  
         this.endPointName = endpointName;  
           
         //Create a connection factory  
         ConnectionFactory factory = new ConnectionFactory();  
           
         //hostname of your rabbitmq server  
         factory.setHost("10.75.4.25");  
         factory.setPort(5672);  
         factory.setUsername("asdf");  
         factory.setPassword("123456");  
           
         //getting a connection  
         connection = factory.newConnection();  
           
         //creating a channel  
         channel = connection.createChannel();  
           
         //declaring a queue for this channel. If queue does not exist,  
         //it will be created on the server.  
         channel.queueDeclare(endpointName, false, false, false, null);  
    }  
       
       
    /** 
     * 關閉channel和connection。並非必須,因為隱含是自動調用的。 
     * @throws IOException 
     */  
     public void close() throws IOException{  
         this.channel.close();  
         this.connection.close();  
     }  
}  

 

2、消息提供者
package com.lin.producer;  
  
import java.io.IOException;  
import java.io.Serializable;  
  
import org.apache.commons.lang.SerializationUtils;  
  
import com.lin.EndPoint;  
   
   
/** 
 *  
 * 功能概要:消息生產者 
 *  
 * @author linbingwen 
 * @since  2016年1月11日 
 */  
public class Producer extends EndPoint{  
       
    public Producer(String endPointName) throws IOException{  
        super(endPointName);  
    }  
   
    public void sendMessage(Serializable object) throws IOException {  
        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));  
    }    
}  

 

3、消息消費者
package com.lin.consumer;  
  
import java.io.IOException;  
import java.util.HashMap;  
import java.util.Map;  
  
import org.apache.commons.lang.SerializationUtils;  
  
import com.lin.EndPoint;  
import com.rabbitmq.client.AMQP.BasicProperties;  
import com.rabbitmq.client.Consumer;  
import com.rabbitmq.client.Envelope;  
import com.rabbitmq.client.ShutdownSignalException;  
   
   
/** 
 *  
 * 功能概要:讀取隊列的程序端,實現了Runnable接口 
 *  
 * @author linbingwen 
 * @since  2016年1月11日 
 */  
public class QueueConsumer extends EndPoint implements Runnable, Consumer{  
       
    public QueueConsumer(String endPointName) throws IOException{  
        super(endPointName);         
    }  
       
    public void run() {  
        try {  
            //start consuming messages. Auto acknowledge messages.  
            channel.basicConsume(endPointName, true,this);  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
   
    /** 
     * Called when consumer is registered. 
     */  
    public void handleConsumeOk(String consumerTag) {  
        System.out.println("Consumer "+consumerTag +" registered");      
    }  
   
    /** 
     * Called when new message is available. 
     */  
    public void handleDelivery(String consumerTag, Envelope env,  
            BasicProperties props, byte[] body) throws IOException {  
        Map map = (HashMap)SerializationUtils.deserialize(body);  
        System.out.println("Message Number "+ map.get("message number") + " received.");  
           
    }  
   
    public void handleCancel(String consumerTag) {}  
    public void handleCancelOk(String consumerTag) {}  
    public void handleRecoverOk(String consumerTag) {}  
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}  
}  

 

4、測試
package com.lin.test;  
  
import java.io.IOException;  
import java.sql.SQLException;  
import java.util.HashMap;  
  
import com.lin.consumer.QueueConsumer;  
import com.lin.producer.Producer;  
   
public class Test {  
    public Test() throws Exception{  
           
        QueueConsumer consumer = new QueueConsumer("queue");  
        Thread consumerThread = new Thread(consumer);  
        consumerThread.start();  
           
        Producer producer = new Producer("queue");  
           
        for (int i = 0; i < 1000000; i++) {  
            HashMap message = new HashMap();  
            message.put("message number", i);  
            producer.sendMessage(message);  
            System.out.println("Message Number "+ i +" sent.");  
        }  
    }  
       
    /** 
     * @param args 
     * @throws SQLException 
     * @throws IOException 
     */  
    public static void main(String[] args) throws Exception{  
      new Test();  
    }  
}  

 

其中引入的jar包:
<!-- rabbitmq客戶端 -->  
<dependencies>  
    <dependency>  
        <groupId>com.rabbitmq</groupId>  
        <artifactId>amqp-client</artifactId>  
        <version>3.0.4</version>  
    </dependency>  
  
<dependency>  
    <groupId>commons-lang</groupId>  
    <artifactId>commons-lang</artifactId>  
    <version>2.6</version>  
</dependency>  
<dependency>  
    <groupId>org.apache.commons</groupId>  
    <artifactId>commons-lang3</artifactId>  
    <version>3.1</version>  
</dependency>  
</dependencies>  

 

測試結果:
在提供消息
在消費消息
然后同時打開rabbitmq的服務端,輸入如下:
rabbitmqctl list_queues  

 

這個命令是用來查看服務端中有多處個消息隊列的。
可以看到有個名為queue的消息隊列(更好的方法是安裝好web監控插件,筆者一直安裝失敗,所以這里就不展示了)
 
 

四、Rbbitmq與Spring結合使用

首先建立一個maven工程,整個項目的結構如下:
下面將具體來講講整個過程
1、jar包的引入
pom.xml配置即可,如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
    <modelVersion>4.0.0</modelVersion>  
    <groupId>com.lin</groupId>  
    <artifactId>rabbit_c2</artifactId>  
    <version>0.0.1-SNAPSHOT</version>  
    <properties>  
        <!-- spring版本號 -->  
        <spring.version>3.2.8.RELEASE</spring.version>  
        <!-- log4j日志文件管理包版本 -->  
        <slf4j.version>1.6.6</slf4j.version>  
        <log4j.version>1.2.12</log4j.version>  
        <!-- junit版本號 -->  
        <junit.version>4.10</junit.version>  
    </properties>  
  
    <dependencies>  
        <!-- 添加Spring依賴 -->  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-core</artifactId>  
            <version>${spring.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-webmvc</artifactId>  
            <version>${spring.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-context</artifactId>  
            <version>${spring.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-context-support</artifactId>  
            <version>${spring.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-aop</artifactId>  
            <version>${spring.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-aspects</artifactId>  
            <version>${spring.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-tx</artifactId>  
            <version>${spring.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-jdbc</artifactId>  
            <version>${spring.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-web</artifactId>  
            <version>${spring.version}</version>  
        </dependency>  
  
        <!--單元測試依賴 -->  
        <dependency>  
            <groupId>junit</groupId>  
            <artifactId>junit</artifactId>  
            <version>${junit.version}</version>  
            <scope>test</scope>  
        </dependency>  
  
        <!-- 日志文件管理包 -->  
        <!-- log start -->  
        <dependency>  
            <groupId>log4j</groupId>  
            <artifactId>log4j</artifactId>  
            <version>${log4j.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.slf4j</groupId>  
            <artifactId>slf4j-api</artifactId>  
            <version>${slf4j.version}</version>  
        </dependency>  
        <dependency>  
            <groupId>org.slf4j</groupId>  
            <artifactId>slf4j-log4j12</artifactId>  
            <version>${slf4j.version}</version>  
        </dependency>  
        <!-- log end -->  
  
        <!--spring單元測試依賴 -->  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-test</artifactId>  
            <version>${spring.version}</version>  
            <scope>test</scope>  
        </dependency>  
  
        <!--rabbitmq依賴 -->  
        <dependency>  
            <groupId>org.springframework.amqp</groupId>  
            <artifactId>spring-rabbit</artifactId>  
            <version>1.3.5.RELEASE</version>  
        </dependency>  
  
        <dependency>  
            <groupId>javax.validation</groupId>  
            <artifactId>validation-api</artifactId>  
            <version>1.1.0.Final</version>  
        </dependency>  
  
        <dependency>  
            <groupId>org.hibernate</groupId>  
            <artifactId>hibernate-validator</artifactId>  
            <version>5.0.1.Final</version>  
        </dependency>  
  
    </dependencies>  
    <build>  
        <resources>  
            <resource>  
                <directory>src/main/resources</directory>  
                <targetPath>${basedir}/target/classes</targetPath>  
                <includes>  
                    <include>**/*.properties</include>  
                    <include>**/*.xml</include>  
                </includes>  
                <filtering>true</filtering>  
            </resource>  
            <resource>  
                <directory>src/main/resources</directory>  
                <targetPath>${basedir}/target/resources</targetPath>  
                <includes>  
                    <include>**/*.properties</include>  
                    <include>**/*.xml</include>  
                </includes>  
                <filtering>true</filtering>  
            </resource>  
        </resources>  
  
        <plugins>  
            <plugin>  
                <groupId>org.apache.maven.plugins</groupId>  
                <artifactId>maven-compiler-plugin</artifactId>  
                <configuration>  
                    <source>1.6</source>  
                    <target>1.6</target>  
                    <encoding>UTF-8</encoding>  
                </configuration>  
            </plugin>  
            <plugin>  
                <groupId>org.apache.maven.plugins</groupId>  
                <artifactId>maven-war-plugin</artifactId>  
                <version>2.1.1</version>  
                <configuration>  
                    <warSourceExcludes>${warExcludes}</warSourceExcludes>  
                </configuration>  
            </plugin>  
            <plugin>  
                <groupId>org.apache.maven.plugins</groupId>  
                <artifactId>maven-surefire-plugin</artifactId>  
                <version>2.4.3</version>  
                <configuration>  
                    <testFailureIgnore>true</testFailureIgnore>  
                </configuration>  
            </plugin>  
            <plugin>  
                <inherited>true</inherited>  
                <groupId>org.apache.maven.plugins</groupId>  
                <artifactId>maven-source-plugin</artifactId>  
                <executions>  
                    <execution>  
                        <id>attach-sources</id>  
                        <goals>  
                            <goal>jar</goal>  
                        </goals>  
                    </execution>  
                </executions>  
            </plugin>  
            <plugin>  
                <groupId>org.apache.maven.plugins</groupId>  
                <artifactId>maven-resources-plugin</artifactId>  
                <configuration>  
                    <encoding>UTF-8</encoding>  
                </configuration>  
            </plugin>  
        </plugins>  
    </build>  
</project> 

 

2、消息生產者
package com.lin.producer;  
  
import javax.annotation.Resource;  
  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.springframework.amqp.core.AmqpTemplate;  
import org.springframework.stereotype.Service;  
  
/** 
 * 功能概要:消息產生,提交到隊列中去 
 */  
@Service  
public class MessageProducer {  
      
    private Logger logger = LoggerFactory.getLogger(MessageProducer.class);  
  
    @Resource  
    private AmqpTemplate amqpTemplate;  
  
    public void sendMessage(Object message){  
      logger.info("to send message:{}",message);  
      amqpTemplate.convertAndSend("queueTestKey",message);  
    }  
}  

 

3、消息消費者
package com.lin.consumer;  
  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.core.MessageListener;  
  
/** 
 * 功能概要:消費接收 
 *  
 * @author linbingwen 
 * @since  2016年1月15日  
 */  
public class MessageConsumer implements MessageListener {  
      
    private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);  
  
    @Override  
    public void onMessage(Message message) {  
        logger.info("receive message:{}",message);  
    }  
  
}  

 

4、rabbitMq.xml配置信息
<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans  
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
     http://www.springframework.org/schema/beans  
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
     http://www.springframework.org/schema/rabbit  
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">  
    <!--配置connection-factory,指定連接rabbit server參數 -->  
    <rabbit:connection-factory id="connectionFactory"  
        username="asdf" password="123456" host="10.75.4.25" port="5672" />  
          
    <!--定義rabbit template用於數據的接收和發送 -->  
    <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory"   
        exchange="exchangeTest" />  
          
    <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->  
    <rabbit:admin connection-factory="connectionFactory" />  
  
    <!--定義queue -->  
    <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />  
  
    <!-- 定義direct exchange,綁定queueTest -->  
    <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">  
        <rabbit:bindings>  
            <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>  
        </rabbit:bindings>  
    </rabbit:direct-exchange>  
      
    <!-- 消息接收者 -->  
    <bean id="messageReceiver" class="com.lin.consumer.MessageConsumer"></bean>  
      
    <!-- queue litener  觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->  
    <rabbit:listener-container connection-factory="connectionFactory">  
             <rabbit:listener queues="queueTest" ref="messageReceiver"/>  
    </rabbit:listener-container>  
      
</beans>  

 

5、spring集成rabbiqMq。application.xml內容如下:
<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:p="http://www.springframework.org/schema/p"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">  
  
  
    <import resource="classpath*:rabbitmq.xml" />  
      
      
    <!-- 掃描指定package下所有帶有如@controller,@services,@resource,@ods並把所注釋的注冊為Spring Beans -->  
    <context:component-scan base-package="com.lin.consumer,com.lin.producer" />  
      
  
    <!-- 激活annotation功能 -->  
    <context:annotation-config />  
    <!-- 激活annotation功能 -->  
    <context:spring-configured />  
  
</beans>  

 

6、最后,為了方便,打印了日志,log4j.properties配置如下
log4j.rootLogger=DEBUG,Console,Stdout  
  
#Console  
log4j.appender.Console=org.apache.log4j.ConsoleAppender  
log4j.appender.Console.layout=org.apache.log4j.PatternLayout  
log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n  
  
log4j.logger.java.sql.ResultSet=INFO  
log4j.logger.org.apache=INFO  
log4j.logger.java.sql.Connection=DEBUG  
log4j.logger.java.sql.Statement=DEBUG  
log4j.logger.java.sql.PreparedStatement=DEBUG   
  
log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender    
log4j.appender.Stdout.File = E://logs/log.log    
log4j.appender.Stdout.Append = true    
log4j.appender.Stdout.Threshold = DEBUG     
log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout    
log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n    

 


接着運行整個工程即可:
下面是運行的結果:
 
一會發一會收:因為在同一工程,所以發消息和接消息是交替出現的
 
我們出可以去rabbitMq 服務器上看:
可以看到,我們配置的隊列已存在了:
到此,整個工程結束。
 
http://blog.csdn.net/evankaka/article/details/50495437
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM