RabbitMQ 6種應用場景


http://www.rabbitmq.com/getstarted.html官網

最近業務需要使用Rabbitmq工作隊列實現任務的負載分發

1.1、什么是RabbitMQ?

RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息。

1.2、什么是AMQP?

  AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。它從生產者接收消息並遞送給消費者,在這個過程中,根據規則進行路由,緩存與持久化。

 而在AMQP中主要有兩個組件:Exchange 和 Queue (在 AMQP 1.0 里還會有變動),如下圖所示,綠色的 X 就是 Exchange ,紅色的是 Queue ,這兩者都在 Server 端,又稱作 Broker ,這部分是 RabbitMQ 實現的,而藍色的則是客戶端,通常有 Producer 和 Consumer 兩種類型:

  idoall.org

1.3、RabbitMQ的基礎概念

  • Broker:簡單來說就是消息隊列服務器實體
  • Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列
  • Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列
  • Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來
  • Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞
  • vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離
  • producer:消息生產者,就是投遞消息的程序
  • consumer:消息消費者,就是接受消息的程序
  • channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務

安裝配置過程,這里不贅述。

RabbitMQ有六種應用場景,這里做簡單介紹

應用場景1-“Hello Word”

一個P向queue發送一個message,一個C從該queue接收message並打印。

send.py 
producer,連接至RabbitMQ Server,聲明隊列,發送message,關閉連接,退出。

應用場景2-work queues

將耗時的消息處理通過隊列分配給多個consumer來處理,我們稱此處的consumer為worker,我們將此處的queue稱為Task Queue,其目的是為了避免資源密集型的task的同步處理,也即立即處理task並等待完成。相反,調度task使其稍后被處理。也即把task封裝進message並發送到task queue,worker進程在后台運行,從task queue取出task並執行job,若運行了多個worker,則task可在多個worker間分配。


new_task.py
建立連接,聲明隊列,發送可以模擬耗時任務的message,斷開連接、退出。

worker.py
建立連接,聲明隊列,不斷的接收message,處理任務,進行確認。

應用場景3-Publish/Subscribe

在應用場景2中一個message(task)僅被傳遞給了一個comsumer(worker)。現在我們設法將一個message傳遞給多個consumer。這種模式被稱為publish/subscribe。此處以一個簡單的日志系統為例進行說明。該系統包含一個log發送程序和一個log接收並打印的程序。由log發送者發送到queue的消息可以被所有運行的log接收者接收。因此,我們可以運行一個log接收者直接在屏幕上顯示log,同時運行另一個log接收者將log寫入磁盤文件。

 


receive_logs.py
日志消息接收者:建立連接,聲明exchange,將exchange與queue進行綁定,開始不停的接收log並打印。

emit_log.py
日志消息發送者:建立連接,聲明fanout類型的exchange,通過exchage向queue發送日志消息,消息被廣播給所有接收者,關閉連接,退出。

應用場景4-Routing

應用場景3中構建了簡單的log系統,可以將log message廣播至多個receiver。現在我們將考慮只把指定的message類型發送給其subscriber,比如,只把error message寫到log file而將所有log message顯示在控制台。

應用場景5-topic

應用場景4中改進的log系統中用direct類型的exchange替換應用場景3中的fanout類型exchange實現將不同的log message發送給不同的subscriber(也即分別通過不同的routing_key將queue綁定到exchange,這樣exchange便可將不同的message根據message內容路由至不同的queue)。但仍然存在限制,不能根據多個規則路由消息,比如接收者要么只能收error類型的log message要么只能收info類型的message。如果我們不僅想根據log的重要級別如info、warning、error等來進行log message路由還想同時根據log message的來源如auth、cron、kern來進行路由。為了達到此目的,需要topic類型的exchange。topic類型的exchange中routing_key中可以包含兩個特殊字符:“*”用於替代一個詞,“#”用於0個或多個詞。

receive_logs_topic.py
log message接收者:建立連接,聲明topic類型的exchange,聲明queue,根據程序參數構造routing_key,根據routing_key將queue綁定到exchange,循環接收並處理message。

emit_log_topic.py
log message發送者:建立連接、聲明topic類型的exchange、根據程序參數構建routing_key和要發送的message,以構建的routing_key將message發送給topic類型的exchange,關閉連接,退出。

應用場景6-PRC

在應用場景2中描述了如何使用work queue將耗時的task分配到不同的worker中。但是,如果我們task是想在遠程的計算機上運行一個函數並等待返回結果呢。這根場景2中的描述是一個完全不同的故事。這一模式被稱為遠程過程調用。現在,我們將構建一個RPC系統,包含一個client和可擴展的RPC server,通過返回斐波那契數來模擬RPC service。

rpc_server.py
RPC server:建立連接,聲明queue,定義了一個返回指定數字的斐波那契數的函數,定義了一個回調函數在接收到包含參數的調用請求后調用自己的返回斐波那契數的函數並將結果發送到與接收到message的queue相關聯的queue,並進行確認。開始接收調用請求並用回調函數進行請求處理。

rpc_client.py
RPC client:遠程過程調用發起者:定義了一個類,類中初始化到RabbitMQ Server的連接、聲明回調queue、開始在回調queue上等待接收響應、定義了在回調queue上接收到響應后的處理函數on_response根據響應關聯的correlation_id屬性作出響應、定義了調用函數並在其中向調用queue發送包含correlation_id等屬性的調用請求、初始化一個client實例,以30為參數發起遠程過程調用。

你可以根據自己想需要進行應用,具體代碼可以去文章頂部的官網查看

本文參考:http://blog.csdn.net/zyz511919766/article/details/41946521


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM