1. 訂閱/發布:
前面worker示例中的每個任務都是只發送給某一個worker,如果我們多個worker都需要接收處理同一個任務,此時就要使用
訂閱/發布功能,比如,日志模塊產生日志並發送到隊列中,隊列連接兩個worker,一個負責打印到控制台,一個負責打印到日志文件,
則隊列需要將內部消息同時發送到兩個worker中做不同的處理。
2. 交換:
前面示例當中,我們是直接使用隊列來收發消息的,那並不是RabbitMQ的完整模型,完整模型當中還包含有"交換",消息不應該直接發送給
隊列,而是發送給"交換"。交換的模型很簡單,其一端連接生產者,一端連接消息隊列,交換需要一定的規則來對收到消息做處理,比如發給
某個隊列,亦或者丟棄該消息,這個規則我們稱之為"交換類型": direct, topic, headers ,fanout,本文以及后面的文章會對幾種類型做詳細
介紹,可以使用如下方式創建交換,如下其名稱為logs,類型是=fanout,fanout類型不會關系消息,只是簡單對消息廣播到連接隊列。
channel.exchange_declare(exchange='logs', type='fanout')
含有交換的完整模型如下圖所示:

3. 臨時隊列:
在不需要多個生產者或者消費者共享隊列的時候,隊列名稱我們是不關心的,RabbitMQ提供了一種隨機生成隊列的方式:
result = channel.queue_declare()
result.method.queue中含有隊列的名稱
當我們需要設置消費者斷開,隊列自動銷毀,可以使用如下方式,標記exlusive=True:
result = channel.queue_declare(exclusive=True)
4. 綁定:
隊列和交換均建立完成,此時我們需要綁定隊列和交換,這樣交換才知道向哪些隊列發送消息,方式如下:
channel.queue_bind(exchange='logs', queue=result.method.queue)
綁定之后的模型如下:

完成模型,包含worker:

5. 測試代碼:
emit_log.py -- 產生日志消息,發送到交換:
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='logs', 10 type='fanout') 11 12 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 13 channel.basic_publish(exchange='logs', 14 routing_key='', 15 body=message) 16 print(" [x] Sent %r" % message) 17 connection.close()
reveive_logs.py--臨時隊列綁定交換,接收日志消息並處理;
1 #!/usr/bin/env python 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='logs', 9 type='fanout') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 channel.queue_bind(exchange='logs', 15 queue=queue_name) 16 17 print(' [*] Waiting for logs. To exit press CTRL+C') 18 19 def callback(ch, method, properties, body): 20 print(" [x] %r" % body) 21 22 channel.basic_consume(callback, 23 queue=queue_name, 24 no_ack=True) 25 26 channel.start_consuming()
