RabbitMQ廣播:fanout模式


一、

消息的廣播需要exchange:exchange是一個轉發器,其實把消息發給RabbitMQ里的exchange

fanout: 所有bind到此exchange的queue都可以接收消息,廣播

direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息

topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

headers:通過headers來決定把消息發給哪些queue,用的比較少

原理圖:

發布者端:

'''
發布者publisher
'''
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',  # exchange名字為logs
                         type='fanout')
# 通過命令行自己輸入消息,沒輸入就是hello world
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
# 廣播不需要寫queue,routing_key為空
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print("send :", message)
connection.close()

訂閱者端:

'''
訂閱者subscriber
'''
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
                         type='fanout')
# 不指定queue名字,rabbit會隨機分配一個唯一的queue,
# exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
# 發送端沒有聲明queue,為什么接收端需要queue?看上面原理圖就明白
result = channel.queue_declare(exclusive=True)
# 拿到的隨機的queue名字
queue_name = result.method.queue
# 需要知道從哪個轉發器上去收所以需要綁定
channel.queue_bind(exchange='logs',
                   queue=queue_name)
print("Wait for logs...")
def callback(ch, method, properties, body):
    print("received:", body)
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

 運行結果:

'''
先啟動發布者,再啟動訂閱者,為什么訂閱者收不到信息?
原理類似於收音機收聽廣播:
訂閱者相當於收音機,發布者相當於廣播信號
所以這個接收是實時的,訂閱者啟動之后,才能收到發布者發出的廣播
'''

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM