Exchange-fanout 廣播模式


一、前言

  我們看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。

 

  RabbitMQ中的Exchange有四種類型,不同的類型有着不同的路由策略,RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種。Exchange是按照什么邏輯將消息路由到Queue的?RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。

  • fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。
  • direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全匹配的Queue中。
  • topic類型中所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
  • headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。

二、fanout廣播模式

  之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange模式中的fanout。

  生產端: 

# -*- coding: UTF-8 -*-

import pika

# 聲明一個連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

# 聲明一個管道
channel = connection.channel()

# 聲明一個exchange,並命名exchange 和定義它的類型
channel.exchange_declare(exchange='test-exchange',
                         exchange_type='fanout')

# 消息內容
message = 'hello,world!'

# 生成一個消息
channel.basic_publish(exchange='test-exchange',
                      routing_key='',  # 設置為空
                      body=message,
                      )
print('[x] Sent %r' % message)

# 關閉連接
connection.close()

  消費端:

# -*- coding: UTF-8 -*-

import pika

# 創建一個連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
# 創建一個管道
channel = connection.channel()

# 聲明一個exchange,並命名exchange 和定義它的類型
# 和生產端一致
channel.exchange_declare(exchange='test-exchange',
                         exchange_type='fanout')

# 在這里我們要聲明隊列
# 雖然在生產端我們沒有聲明隊列,是因為消息是廣播的,對所有隊列發送
# 在此聲明的隊列也不用命名,rabbitmq會隨機分配名稱
# 也是就是說消費端還是要從隊列中獲取消息內容

result = channel.queue_declare(exclusive=True)
# exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除

# 隊列名
queue_name = result.method.queue

# 綁定exchange
channel.queue_bind(exchange='test-exchange',
                   queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

  生產者產生的消息,每一個消費者都能取到。

  注: 廣播模式是實時的,也就是說,在生產端產生消息的時候,消費端必須是處於監聽狀態的,才能從隊列中獲取消息,否則就收不到本次消息,哪怕再次啟動。就像廣播電台和收音機一樣,廣播電台的內容一直播放,如果收音機不打開就收聽不到,即使后面打開了,先前的內容也播放完了。

   注:

exclusive=True

# 消費端只有存在的時候,隊列名才會存在,並且exchange會向這個隊列中
# 發送消息,一旦消費端關閉,隊列queue就會被刪除

  運行3個消費者,如果關閉一個,rabbitmq中的隊列直接被刪除了一個

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM