在之前的文章RabbitMQ入門(二)工作隊列中,我們創建了一個工作隊列。工作隊列背后的假設是每一項任務都被准確地傳送至一個worker。在本文中,我們將會做一些不同的事情——我們將會把一個消息發送至許多消費者中。這種模式被稱為訂閱模式(publish/subscribe)
。
為了解釋這種模式,我們將會構建一個簡單的日志系統。它包含兩個程序——第一個將會產生消息,第二個將會接收並輸出這些消息。
在我們的日志系統中,每一個正在運行的接收程序都會收到消息。在這種方式下,我們可以運行一個接收程序來接收並將日志保存至硬盤;同時,我們還能運行另一個接收程序,在屏幕上觀察到日志的輸出。
特別地,發送的這些消息都會被廣播到所有的接收程序。
交換(Exchanges)
在之前的文章中,我們向隊列發送消息,從隊列中接受消息。現在是時候介紹RabbitMQ中的全部消息轉發模式。
讓我們快速地瀏覽下之前文章中講了些什么:
- 一個生產者(Producer)是用於產生消息的用戶應用程序;
- 一個隊列(Queue)是緩存區,用於儲存消息;
- 一個消費者(Consumer)是用於接收消息的用戶應用程序。
RabbitMQ中消息傳輸模式的核心思想是生產者絕不會直接向隊列發送任何消息。實際上,通常情況下生產者甚至都不會知道消息是否會被發送至隊列。
生產者會將消息發送至交換(exchange)
。交換
並不復雜。一方面它從生產者中接受消息,另一方面將消息推送至隊列。交換
必須知道,當它接受一個消息時,它該怎么做。是否這個消息會附加至一個特殊的隊列?是否它會附加至許多隊列?或者它會被丟棄。這個規則用交換類型(exchange type)
來定義。
有一些可用的交換類型
:直接分發(direct)
,通配分發(topic)
,headers
和復制分發(fanout)
。我們將會集中講最后一個——fanout。我們創建一個交換
,類型為fanout,並取名為logs:
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
fanout交換非常簡單。顧名思義,它會將所有它知道的接收隊列的消息都廣播出去。而這也正是我們的日志系統所需要的。
現在,我們可以發布已經命名好的隊列了:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
臨時隊列
你也許還記得在之前的文章中,我們需要給隊列取名。但是呢,給隊列命名太麻煩了——我們需要將workers指定到同一個隊列。當你需要在生產者和消費者之間共享隊列的時候,給隊列命名又是很重要的。
這種情形並不適合我們的日志系統。我們想要監聽所有的消息,而不是部分消息。同時,我們僅對當前的流動消息感興趣,而不是之前的消息。為了解決這個問題,我們需要做兩件事情。
首先,無論何時我們連接到RabbitMQ,我們需要一個新的空隊列。為此,我們創建一個隨機命名的隊列,或者更好的是,讓RabbitMQ Server來給我們創建一個隨機命名的隊列。因此,我們可以利用queue_declare
命令,設置queuq
參數為空:
result = channel.queue_declare(queue='')
此時,result.method.queue
會包含一個隨機命名的隊列,比如說,它會和amq.gen-JzTY20BRgKO-HjmUJj0wLg
類似。
其次,一旦消息者的連接關閉,我們需要刪除隊列。這可以用exclusive
參數搞定:
result = channel.queue_declare(queue='', exclusive=True)
綁定(Bindings)
我們已經創建了一個fanout 交換和隊列。現在我們需要告訴交換,將消息發送至隊列。交換與隊列之間的關系叫做綁定(Bindings)
。
channel.queue_bind(exchange='logs',
queue=result.method.queue)
從現在開始,logs
交換將會在我們的隊列后追加消息。
代碼
生產者代碼(emit_log.py):
# -*- coding: utf-8 -*-
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
消費者代碼(receive_log.py):
# -*- coding: utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
開啟四個終端,其中一個用於保存日志:
python3 receive_log.py > logs_from_rabbit.log
另一個用於觀察日志輸出:
python3 receive_log.py
日志產生:
python3 emit_log.py
監聽綁定:
sudo rabbitmqctl list_bindings
運行截圖如下:
本次分享到此結束,感謝大家閱讀~