一個winform帶你玩轉rabbitMQ


源碼已放出 https://github.com/dubing/MaoyaRabbit

本章分3部分

一、安裝部署初探

二、進階

三、api相關


安裝 部署 初探

先上圖


一. 安裝部署

  下載 rabbitMQ :http://www.rabbitmq.com/download.html

  安裝rabbitmq需要erlang,下載erlang:http://www.erlang.org/download.html

  按照官網按照步驟,例如windows http://www.rabbitmq.com/install-windows.html 

  安裝完rabbitMQ可以再啟動插件擴展,其中包含了一個管理后台

  

  最新版本的后台地址為 http://localhost:15672/ 

  

  用戶名和密碼都為guest,輸入完成進入主菜單

  

  功能很豐富,可以查看當前服務器的交換機,隊列,消息,連接,會話等得使用情況。

  基本上到這里服務器的安裝部署環節算是ok,很簡單。


 

二.  簡介

  要了解rabbitMQ 首先要了解AMQP協議 百科上給的很詳細 http://baike.baidu.com/view/4023136.htm?fr=aladdin

  AMQP 有四個非常重要的概念:虛擬機(virtual host),通道(exchange),隊列(queue)和綁定(binding)。

  虛擬機: 通常是應用的外在邊界,我們可以為不同的虛擬機分配訪問權限。虛擬機可持有多個交換機、隊列和綁定。
  交換機: 從連接通道(Channel)接收消息,並按照特定的路由規則發送給隊列。
  隊列: 消息最終的存儲容器,直到消費客戶端(Consumer)將其取走。
  綁定: 也就是所謂的路由規則,告訴交換機將何種類型的消息發送到某個隊列中。

  這個概念很重要 不然在學習rabbitmq的地方會碰到很多困難。想要進階學習的可以參考 https://www.rabbitmq.com/tutorials/amqp-concepts.html

  借用官方一個圖來闡述AMQP

  

  RabbitMQ是一個消息代理。它的核心原理非常簡單:接收和發送消息。

  你可以把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ是一個郵箱、郵局、郵遞員。RabbitMQ和郵局的主要區別是,它處理的不是紙,而是接收、存儲和發送二進制的數據——消息。

  對於rabbitMQ本身的特點 參考官網 http://www.rabbitmq.com/features.html

  1、可靠性(Reliability)
  RabbitMQ提供很多特性供我們可以在性能和可靠性作出折中的選擇,包括持久化、發送確認、發布者確認和高可用性等。
  2、彈性選路(Flexible Routing)
  消息在到達隊列前通過交換(exchanges)來被選路。RabbitMQ為典型的選路邏輯設計了幾個內置的交換類型。對於更加復雜的選路,我們可以將exchanges綁定在一起或者寫屬於自己的exchange類型插件。
  3、集群化(Clustering)
  在一個局域網內的幾個RabbitMQ服務器可以集群起來,組成一個邏輯的代理人。
  4、聯盟(Federation)
  對於那些需要比集群更加松散和非可靠連接的服務器來說,RabbitMQ提供一個聯盟模型(Federation Model)
  5、高可用隊列(High Available Queue)
  可以在一個集群里的幾個機器里對隊列做鏡像,確保即時發生了硬件失效,你的消息也是安全的。
  6、多客戶端(Many Clients)
  有各種語言的RabbitMQ客戶端
  7、管理UI(Management UI)
  RabbitMQ提供一個易用的管理UI來監控和控制消息代理人的各個方面。
  8、跟蹤(Tracing)
  如果你的消息系統行為異常,RabbitMQ提供跟蹤支持來找出錯誤的根源。
  9、插件系統(Plugin System)
  RabbitMQ提供各種方式的插件擴展,我們可以實現自己的插件。

  使用任務隊列一個優點是能夠輕易地並行處理任務。當處理大量積壓的任務,只要增加工作隊列,通過這個方式,能夠實現輕易的縮放。


 

三. 初探

  文中的winform所采取的client為官方的.net版本 https://github.com/rabbitmq/rabbitmq-dotnet-client

  首先是Connection和Channel的概念

  Connection 建立與rabbitmq server的一個連接,由ConnectionFactory創建,Channel建立在connection基礎上的一個頻道,相對於connection來說,它是輕量級的。可以理解成一次會話。

  代碼示例 本機環境

                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
			//do something
                    }
                }

  exchange常用有三種類型:

  Direct :處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。
  Fanout :不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。
  Topic : 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。

  還有一種多重屬性的類型headers,我們在下一章節討論。

  我們用winform分別造成三種類型的exchange來實際體驗一下

  

  這里所謂的限定exchange是在我們安裝rabbitmq server的時候自動生成的一些 我們的測試不使用這些exchange。

  然后我們新建3個Queue,這里我們會發現一個有趣的現象,rabbitmq server對於新生成的隊列都會默認綁定在一個名稱為“”的默認exchange上。

  先試試direct類型,下面我們分別把Q1,Q2,Q3根據路由key為空,k1,k.#綁定在dEx上(direct exchange)。
    

  然后我們根據路由key為空,k,k1,k2,k3來發送消息m1,m2,m3,m4,m5
    

  再用3個隊列接收消息試一下結果
  

  因為發送確認標記ack,所以隊列上讀取過的消息會被刪除,為了進一步認證,我在結尾又添加了一個routingkey為k.#的消息(對應綁定Q3),由圖可見direct 模式下隊列之收取他們完全對應的routingkey消息。

  下面我們再試一下fanout類型,把Q1,Q2,Q3根據路由key為空,k1,k.#綁定在fEx上(fanout exchange)。

  
  同上步驟建立綁定關系

  

  生產消息,然后看下隊列接受消息的情況
  

  效果很明顯,fanout為廣播模式。

  再試試topic類型 把Q1,Q2,Q3根據路由key為空,k1,k.#綁定在tEx上(topic exchange)。

  

  推送消息
  

  接收消息
  
  通過3種模式 3個隊列的消息讀取 大家應該了解了這3中模式的區別。


進階

一.  exchange屬性

  Type

  前一章我們說了exchange的類型分為fanout,direct,topic.還有一種不常用的headers。
  headers這種類型的exchange綁定的時候會忽略掉routingkey,Headers是一個鍵值對,可以定義成成字典等。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。匹配有兩種方式all和any。這兩種方式是在接收端必須要用鍵值"x-mactch"來定義。all代表定義的多個鍵值對都要滿足,而any則代碼只要滿足一個就可以了。之前的幾種exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型
  舉個例子,發送端定義2個鍵值{k1,1},{k2,2},接收端綁定隊列的時候定義{"x-match", "any"},那么接收端的鍵值屬性里只要存在{k1,1}或{k2,2}都可以獲取到消息。
  這樣的類型擴展的程度很大,適合非常復雜的業務場景。

  Durability

  持久性,這是exchange的可選屬性,如果你Durability設置為false,那些當前會話結束的時候,該exchange也會被銷毀。 
  新建一個transient exchange 
  
  關閉當前連接再查看一下
  

  

  剛才我們新建的transient已經銷毀了。

  Auto delete

  當沒有隊列或者其他exchange綁定到此exchange的時候,該exchange被銷毀。這個很簡單就不示例了。

  Internal (比較簡單 也不展示了)

  表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定。

  PS: 無法聲明2個名稱相同 但是類型卻不同的exchange

  


二.  Queue屬性  

  Durability 和exchange相同,未持久化的隊列,服務重啟后銷毀。

  Auto delete 當沒有消費者連接到該隊列的時候,隊列自動銷毀。

  Exclusive 使隊列成為私有隊列,只有當前應用程序可用,當你需要限制隊列只有一個消費者,這是很有用的。

  擴展屬性如下對應源程序 RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary<string,object>)最后的參數

  Message TTL 當一個消息被推送在該隊列的時候 可以存在的時間 單位為ms,(對應擴展參數argument "x-message-ttl" )

  Auto expire 在隊列自動刪除之前可以保留多長時間(對應擴展參數argument "x-expires")

  Max length 一個隊列可以容納的已准備消息的數量(對應擴展參數argument "x-max-length")

  ... 更多參考 http://www.rabbitmq.com/extensions.html

  ps:一旦創建了隊列和交換機,就不能修改其標志了。例如,如果創建了一個non-durable的隊列,然后想把它改變成durable的,唯一的辦法就是刪除這個隊列然后重現創建。


三.  Message屬性

  Durability 

  消息的持久在代碼中設置的方法與exchange和queue不同,有2種方法

  1.

  IBasicProperties properties = channel.CreateBasicProperties();
  properties.SetPersistent(true);
  byte[] payload = Encoding.ASCII.GetBytes(message);
  channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);

  2.

  IBasicProperties properties = channel.CreateBasicProperties();
  properties.DeliveryMode = 2;
  byte[] payload = Encoding.ASCII.GetBytes(message);
  channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);

  contentType: 標識消息內容的MIME,例如JSON用application/json

  replayTo: 標識回調的queue的地址

  correlationId:用於request和response的關聯,確保消息的請求和響應的同一性

  Message的2種狀態:

  Ready

  此狀態的消息存在於隊列中待處理。

  Unacknowledged

  此狀態的消息表示已經在處理未確認。

  說到Unacknowledged,這里需要了解一個ack的概念。當Consumer接收到消息、處理任務完成之后,會發送帶有這個消息標示符的ack,來告訴server這個消息接收到並處理完成。RabbitMQ會一直等到處理某個消息的Consumer的鏈接失去之后,才確定這個消息沒有正確處理,從而RabbitMQ重發這個消息。
  Message acknowledgment是默認關閉的。初始化Consumer時有個noAck參數,如果設置為true,這個Consumer在收到消息之后會馬上返回ack。

  string BasicConsume(string queue, bool noAck, RabbitMQ.Client.IBasicConsumer consumer)

  一般來說,常用的場景noack一般就是設置成true,但是對於風險要求比較高的項目,例如支付。對於每一條消息我們都需要保證他的完整性和正確性。就需要獲取消息后確認執行完正確的業務邏輯后再主動返回一個ack給server。可以通過rabbitmqctl list_queues name message_rady message_unacknowleded 命令來查看隊列中的消息情況,也可以通過后台管理界面。

  我們先hold住一條消息

  

  然后我們再關閉鏈接或者重啟服務

  
  數據還是完整的。 

  ps:message的消費還分為consume和baseget 下面講到集群的時候再介紹。


四.  binding相關

  如果你綁定了一個durable的隊列和一個durable的交換機,RabbitMQ會自動保留這個綁定。類似的,如果刪除了某個隊列或交換機(無論是不是 durable),依賴它的綁定都會自動刪除。

  在聲明一個隊列的同時,server會默認讓此隊列綁定在默認的exchange上,這個exchange的名稱為空。

   


 五.  發布訂閱

  我們上一章的demo中實際上已經使用了發布訂閱模式。

  RabbitMQ消息模型的核心理念是:發布者(producer)不會直接發送任何消息給隊列。事實上,發布者(producer)甚至不知道消息是否已經被投遞到隊列。發布者(producer)只需要把消息發送給一個exchange。exchange非常簡單,它一邊從發布者方接收消息,一邊把消息推入隊列。exchange必須知道如何處理它接收到的消息,是應該推送到指定的隊列還是是多個隊列,或者是直接忽略消息。這些規則是通過exchange type來定義的。

  

  發布訂閱其實很簡單,例如上章我所示例,假設我們一開始沒有任何消息,現在有一個生產者P1,他是一個天氣預報播放者。然后我們有2個消費者來訂閱他的消息。
  P1通過廣播類型的交換機fEx來發布他的天氣消息,c1,c2分別建立一個隊列為Q1,Q2. 並且訂閱P1的fEx.

  基本可以如圖所示
  
  我們P1利用fEx生成一條消息的時候,c1,c2通過Q1,Q2都可以獲取到p1所發布的消息

  我們發布3條消息
  
  查看隊列情況
  Q1:
  
  Q2:

  

  Q1,Q2都拿到了廣播的消息,至於C1,C2如何消費這些消息,互相之間完全沒有干擾。

  ps:簡單一句話 發布訂閱中發布者所產生的消息通過exchange對所有綁定他的隊列隊形消息推送,每個隊列獲取綁定所對應的消息


六.  WorkQueue (可用於消費者集群)

  區分於發布訂閱,消費者集群主要解決橫向服務器擴展問題,如果一個隊列積壓太多,如何均與的讓不同的消費者來承擔。

  

  默認來說,RabbitMQ會按順序得把消息發送給每個消費者(consumer)。平均每個消費者都會收到同等數量得消息。這種發送消息得方式叫做——輪詢(round-robin)。

  我們開3個程序,1個生產 2個消費。

  如圖所示綁定關系如下

  

  2個消費者用同樣的程序,這里記錄進程pid以區分,實際項目中可以用不同服務器來區分

  

   啟動消息消費,使消費者處理work狀態

  

  然后我們不停的通過生產者這發布消息

  

  然后我們看下2個消費者的消費情況

  1.

  

  2.
  

  3.
  

  4.
  

  5.
  
  

  默認地,RabbitMQ會逐一地向下一個Consumer發放消息,每一個Consumer會得到數目相同的消息。文中所示之所以是按照1條一條的輪詢,是因為程序中控制了一個隊列單次消費的數量。

  void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)


API CommandLine 以及其他功能

RabbitMQ API

  RabbitMQ Server提供了豐富的http api。

  舉個列子

  

  需要HTTP基本身份驗證。默認的用戶名/密碼為guest/guest。

  這些返回值得意義我從官網搬來解釋,為了避免翻譯的問題導致大家理解的誤差這里直接給出原文

cluster_name The name of the entire cluster, as set with rabbitmqctl set_cluster_name.
erlang_full_version A string with extended detail about the Erlang VM and how it was compiled, for the node connected to.
erlang_version A string with the Erlang version of the node connected to. As clusters should all run the same version this can be taken as representing the cluster.
exchange_types A list of all exchange types available.
listeners All (non-HTTP) network listeners for all nodes in the cluster. (See contexts in /api/nodes for HTTP).
management_version Version of the management plugin in use.
message_stats A message_stats object for everything the user can see - for all vhosts regardless of permissions in the case of monitoring and administrator users, and for all vhosts the user has access to for other users.
node The name of the cluster node this management plugin instance is running on.
object_totals An object containing global counts of all connections, channels, exchanges, queues and consumers, subject to the same visibility rules as for message_stats.
queue_totals An object containing sums of the messagesmessages_ready and messages_unacknowledged fields for all queues, again subject to the same visibility rules as for message_stats.
rabbitmq_version Version of RabbitMQ on the node which processed this request.
statistics_db_node Name of the cluster node hosting the management statistics database.
statistics_level Whether the node is running fine or coarse statistics.

  又或者通過api查詢虛擬主機
  
  許多api的URI需要一個虛擬主機路徑的一部分的名字,因為名字只有唯一在一個虛擬主機識別物體。作為默認的虛擬主機稱為“/”,這​​將需要被編碼為“%2F”。

  在我的demo程序中對應的api功能可以通過這里的功能來實現

  

  其更豐富的功能可以參考官網說明文檔 http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html

  以及 http://hg.rabbitmq.com/rabbitmq-management/raw-file/rabbitmq_v3_3_5/priv/www/api/index.html

  一般來說我們常用的我在應用程序中已經給出 例如查看所有隊列等

  


 RabbitMQ CommandLine

  除了豐富的http api,rabbitmq server自然也有其很全面命令行。

  例如查詢所有exchange。

  

  查詢所有隊列以及他們包含的消息數目

   

  rabbitmqctl更多的命令說明參考 http://www.rabbitmq.com/man/rabbitmqctl.1.man.html


Message的BasicGet於consume的區別

   consume的功能上一張介紹過,basicget更偏向於我們平時用過的其他類型的MessageQueue,它就是最基本的接受消息,consume的消費針對basicget來說屬於一個長連接於短連接的區別。

消費者關系一旦確定,基本上默認它就是在偵聽通道的消息是否在生產。而basicget則是由客戶端手動來控制。

  在demo中在下圖所示處區分

  

  如果你選擇了消費消息,那么基本上代碼層面是這樣來完成的

                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicQos(0, 1, false);
                    channel.BasicConsume(queue.name, rbAckTrue.Checked, consumer);
                    while (true)
                    {
                        var e = consumer.Queue.Dequeue();
                        MessageBox.Show(string.Format("隊列{0}獲取消息{1},線程id為{2}", queue.name, Encoding.ASCII.GetString(e.Body), Process.GetCurrentProcess().Id));
                        Thread.Sleep(1000);
                    }

本篇先到此,希望對大家有幫助
  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM