RabbitMQ RabbitMQ Publish\Subscribe(消息發布\訂閱)
1對1的消息發送和接收,即消息只能發送到指定的queue里,但這樣使用有些局限性,有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了
Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息:
- fanout:所有bind到此exchange的queue都可以接收消息
- direct:通過routingKey和exchange決定的那個唯一的queue可以接收消息
- topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
- headers:通過headers 來決定把消息發給哪些queue(這個很少用)
一、fanout訂閱發布模式(廣播模式)
這種模式是所有綁定exchange的queue都可以接收到消息(純廣播的,所有消費者都能收到消息)

1、生產者(fanout_produce):
因為生產者是以廣播的形式,所以這邊不需要聲明queue
import pika
#創建socket實例,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
#聲明exchange的名字和類型
channel.exchange_declare(exchange="practice",
exchange_type="fanout")
#廣播一個消息
channel.basic_publish(
exchange="practice",
routing_key='',
body="hello word"
)
print("send hello word")
connect.close()
2、消費者(fan_consumers):
消費者這邊要聲明一個唯一的queue_name的對象,並且從對象中獲取queue名(每個唯一的queue就代表這一個收音機,鎖定着FM頻道(exchange))
import pika
#創建socket鏈接,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
#聲明exchange名字和類型
channel.exchange_declare(exchange="practice", exchange_type="fanout")
#rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除,result是queue的對象實例
result = channel.queue_declare(exclusive=True) # 參數 exclusive=True 獨家唯一的
queue_name = result.method.queue
#綁定exchange, 相當於打開收音機,鎖定了一個FM頻道
channel.queue_bind(exchange="practice",
queue=queue_name)
#回調函數
def callback(ch,method,properties,body):
print("{0}".format(body))
#消費信息
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
#開始消費
channel.start_consuming()
注:
生產者發送廣播是實時的,消費者需要提前等待生產者發生消息,這個又叫訂閱發布,收音機模式,就像只有收音機打開了才能聽到鎖定的FM頻道,但是如果在節目開始一段時間,再打開收音機的話,之前的節目就收聽不到了
二、direct訂閱發布模式(廣播模式)
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

1、生產者(direct_prodecer)
import pika
import sys
#創建socket實例,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
#聲明exchange的名字和類型
channel.exchange_declare(exchange="direct_practice",
exchange_type="direct")
# 日志級別,通過三元運算符,獲取外部傳遞的參數
severity = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "hello world"
#廣播一個消息
channel.basic_publish(
exchange="direct_practice",
routing_key=severity,
body=message
)
print("send %s %s" % (severity, message))
connect.close()
2、消費者(direct_consumers)
import pika
import sys
#創建socket鏈接,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
#聲明exchange名字和類型
channel.exchange_declare(exchange="direct_practice", exchange_type="direct")
#rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除,result是queue的對象實例
result = channel.queue_declare(exclusive=True) # 參數 exclusive=True 獨家唯一的
queue_name = result.method.queue
#獲取外部手動輸入的安全級別
severities = sys.argv[1:]
#如果沒有外部沒有輸入參數就直接退出
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
#循環遍歷綁定消息隊列
for severity in severities:
#綁定exchange, 相當於打開收音機,鎖定了一個FM頻道
channel.queue_bind(exchange="direct_practice",
queue=queue_name,
routing_key=severity)
print("Waiting for msg")
#回調函數
def callback(ch,method,properties,body):
print("%s %s" % (method.routing_key, body))
#消費信息
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
#開始消費
channel.start_consuming()
3、輸出
服務端:
C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 direct_producer.py info send info hello world C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 direct_producer.py warning send warning hello world
客戶端:
C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 direct_consumer.py info Waiting for msg info b'hello world'
topic定義發布模式(廣播模式)
direct把info、error、warning綁定級別把消息區分了,如果想做的更細致的區分,如在Linux上有一個系統日志,所有程序都在這個日志里面打日志。那如果我想知道什么是mysql的發出來的日志,什么是apache發出來的日志。然后mysql日志里面同時是info、warning、error。所以需要做更細的區分,更細致的消息過濾

1、生產者(topic_producer)
import pika
import sys
#創建socket實例,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
#聲明exchange的名字和類型
channel.exchange_declare(exchange="topic_practice",
exchange_type="topic")
# 關鍵字,通過三元運算符,獲取外部傳遞的參數
keys = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "hello world"
#廣播一個消息
channel.basic_publish(
exchange="topic_practice",
routing_key=keys,
body=message
)
print("send %s %s" % (keys, message))
connect.close()
2、消費者(topic_consumers)
import pika
import sys
#創建socket實例,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
#聲明exchange的名字和類型
channel.exchange_declare(exchange="topic_practice",
exchange_type="topic")
# 關鍵字,通過三元運算符,獲取外部傳遞的參數
keys = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "hello world"
#廣播一個消息
channel.basic_publish(
exchange="topic_practice",
routing_key=keys,
body=message
)
print("send %s %s" % (keys, message))
connect.close()
3、輸出
服務端:
C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 topic_producer.py mysql.info send mysql.info hello world C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 topic_producer.py adapter.error send adapter.error hello world
客戶端:
C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 topic_consumers.py mysql.* *.error Waiting for msg mysql.info b'hello world' adapter.error b'hello world'
4、匹配規則

