RabbitMQ消息隊列(三): 發布/訂閱


1. 訂閱/發布:

前面worker示例中的每個任務都是只發送給某一個worker,如果我們多個worker都需要接收處理同一個任務,此時就要使用

訂閱/發布功能,比如,日志模塊產生日志並發送到隊列中,隊列連接兩個worker,一個負責打印到控制台,一個負責打印到日志文件,

則隊列需要將內部消息同時發送到兩個worker中做不同的處理。

 

2. 交換:

前面示例當中,我們是直接使用隊列來收發消息的,那並不是RabbitMQ的完整模型,完整模型當中還包含有"交換",消息不應該直接發送給

隊列,而是發送給"交換"。交換的模型很簡單,其一端連接生產者,一端連接消息隊列,交換需要一定的規則來對收到消息做處理,比如發給

某個隊列,亦或者丟棄該消息,這個規則我們稱之為"交換類型": direct, topic, headers ,fanout,本文以及后面的文章會對幾種類型做詳細

介紹,可以使用如下方式創建交換,如下其名稱為logs,類型是=fanout,fanout類型不會關系消息,只是簡單對消息廣播到連接隊列。

channel.exchange_declare(exchange='logs',
                         type='fanout')

 

含有交換的完整模型如下圖所示:

              

 

3. 臨時隊列:

在不需要多個生產者或者消費者共享隊列的時候,隊列名稱我們是不關心的,RabbitMQ提供了一種隨機生成隊列的方式:

result = channel.queue_declare()

result.method.queue中含有隊列的名稱

當我們需要設置消費者斷開,隊列自動銷毀,可以使用如下方式,標記exlusive=True:

result = channel.queue_declare(exclusive=True)

 

4. 綁定:

隊列和交換均建立完成,此時我們需要綁定隊列和交換,這樣交換才知道向哪些隊列發送消息,方式如下:

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

綁定之后的模型如下:

               

 

完成模型,包含worker:

               

 

5. 測試代碼:

emit_log.py -- 產生日志消息,發送到交換:

 1 #!/usr/bin/env python
 2 import pika
 3 import sys
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8 
 9 channel.exchange_declare(exchange='logs',
10                          type='fanout')
11 
12 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
13 channel.basic_publish(exchange='logs',
14                       routing_key='',
15                       body=message)
16 print(" [x] Sent %r" % message)
17 connection.close()

 

reveive_logs.py--臨時隊列綁定交換,接收日志消息並處理;

 1 #!/usr/bin/env python
 2 import pika
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='logs',
 9                          type='fanout')
10 
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13 
14 channel.queue_bind(exchange='logs',
15                    queue=queue_name)
16 
17 print(' [*] Waiting for logs. To exit press CTRL+C')
18 
19 def callback(ch, method, properties, body):
20     print(" [x] %r" % body)
21 
22 channel.basic_consume(callback,
23                       queue=queue_name,
24                       no_ack=True)
25 
26 channel.start_consuming()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM