一、前言
我們看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。

RabbitMQ中的Exchange有四種類型,不同的類型有着不同的路由策略,RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種。Exchange是按照什么邏輯將消息路由到Queue的?RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
- fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。
- direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全匹配的Queue中。
- topic類型中所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
- headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
二、fanout廣播模式
之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange模式中的fanout。
生產端:
# -*- coding: UTF-8 -*-
import pika
# 聲明一個連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
# 聲明一個管道
channel = connection.channel()
# 聲明一個exchange,並命名exchange 和定義它的類型
channel.exchange_declare(exchange='test-exchange',
exchange_type='fanout')
# 消息內容
message = 'hello,world!'
# 生成一個消息
channel.basic_publish(exchange='test-exchange',
routing_key='', # 設置為空
body=message,
)
print('[x] Sent %r' % message)
# 關閉連接
connection.close()
消費端:
# -*- coding: UTF-8 -*-
import pika
# 創建一個連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
# 創建一個管道
channel = connection.channel()
# 聲明一個exchange,並命名exchange 和定義它的類型
# 和生產端一致
channel.exchange_declare(exchange='test-exchange',
exchange_type='fanout')
# 在這里我們要聲明隊列
# 雖然在生產端我們沒有聲明隊列,是因為消息是廣播的,對所有隊列發送
# 在此聲明的隊列也不用命名,rabbitmq會隨機分配名稱
# 也是就是說消費端還是要從隊列中獲取消息內容
result = channel.queue_declare(exclusive=True)
# exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
# 隊列名
queue_name = result.method.queue
# 綁定exchange
channel.queue_bind(exchange='test-exchange',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
生產者產生的消息,每一個消費者都能取到。
注: 廣播模式是實時的,也就是說,在生產端產生消息的時候,消費端必須是處於監聽狀態的,才能從隊列中獲取消息,否則就收不到本次消息,哪怕再次啟動。就像廣播電台和收音機一樣,廣播電台的內容一直播放,如果收音機不打開就收聽不到,即使后面打開了,先前的內容也播放完了。
注:
exclusive=True # 消費端只有存在的時候,隊列名才會存在,並且exchange會向這個隊列中 # 發送消息,一旦消費端關閉,隊列queue就會被刪除
運行3個消費者,如果關閉一個,rabbitmq中的隊列直接被刪除了一個

