【python】-- RabbitMQ Publish\Subscribe(消息發布\訂閱)


RabbitMQ RabbitMQ Publish\Subscribe(消息發布\訂閱)

1對1的消息發送和接收,即消息只能發送到指定的queue里,但這樣使用有些局限性,有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了

Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息:

  1. fanout:所有bind到此exchange的queue都可以接收消息
  2. direct:通過routingKey和exchange決定的那個唯一的queue可以接收消息
  3. topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
  4. headers:通過headers 來決定把消息發給哪些queue(這個很少用)

 

一、fanout訂閱發布模式(廣播模式)

這種模式是所有綁定exchange的queue都可以接收到消息(純廣播的,所有消費者都能收到消息)

 

1、生產者(fanout_produce):

因為生產者是以廣播的形式,所以這邊不需要聲明queue

import pika
#創建socket實例,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()

#聲明exchange的名字和類型
channel.exchange_declare(exchange="practice",
                         exchange_type="fanout")

#廣播一個消息
channel.basic_publish(
    exchange="practice",
    routing_key='',
    body="hello word"
)

print("send hello word")

connect.close()

2、消費者(fan_consumers):

消費者這邊要聲明一個唯一的queue_name的對象,並且從對象中獲取queue名(每個唯一的queue就代表這一個收音機,鎖定着FM頻道(exchange))

import pika
#創建socket鏈接,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
#聲明exchange名字和類型
channel.exchange_declare(exchange="practice", exchange_type="fanout")
#rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除,result是queue的對象實例
result = channel.queue_declare(exclusive=True)  # 參數 exclusive=True 獨家唯一的
queue_name = result.method.queue
#綁定exchange, 相當於打開收音機,鎖定了一個FM頻道
channel.queue_bind(exchange="practice",
                   queue=queue_name)


#回調函數
def callback(ch,method,properties,body):
    print("{0}".format(body))
#消費信息
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
#開始消費
channel.start_consuming()

 注:

生產者發送廣播是實時的,消費者需要提前等待生產者發生消息,這個又叫訂閱發布,收音機模式,就像只有收音機打開了才能聽到鎖定的FM頻道,但是如果在節目開始一段時間,再打開收音機的話,之前的節目就收聽不到了

 

 

二、direct訂閱發布模式(廣播模式)

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

 

1、生產者(direct_prodecer)

import pika
import sys
#創建socket實例,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()

#聲明exchange的名字和類型
channel.exchange_declare(exchange="direct_practice",
                         exchange_type="direct")
# 日志級別,通過三元運算符,獲取外部傳遞的參數
severity = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "hello world"
#廣播一個消息
channel.basic_publish(
    exchange="direct_practice",
    routing_key=severity,
    body=message
)

print("send %s  %s" % (severity, message))

connect.close()

2、消費者(direct_consumers)

import pika
import sys
#創建socket鏈接,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
#聲明exchange名字和類型
channel.exchange_declare(exchange="direct_practice", exchange_type="direct")
#rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除,result是queue的對象實例
result = channel.queue_declare(exclusive=True)  # 參數 exclusive=True 獨家唯一的
queue_name = result.method.queue

#獲取外部手動輸入的安全級別
severities = sys.argv[1:]
#如果沒有外部沒有輸入參數就直接退出
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

#循環遍歷綁定消息隊列
for severity in severities:
    #綁定exchange, 相當於打開收音機,鎖定了一個FM頻道
    channel.queue_bind(exchange="direct_practice",
                       queue=queue_name,
                       routing_key=severity)

print("Waiting for msg")


#回調函數
def callback(ch,method,properties,body):
    print("%s  %s" % (method.routing_key, body))
#消費信息
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
    #開始消費
channel.start_consuming()

 3、輸出

服務端:

C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 direct_producer.py info
send info  hello world

C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 direct_producer.py warning
send warning  hello world

客戶端:

C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 direct_consumer.py info
Waiting for msg
info  b'hello world'

  

 

topic定義發布模式(廣播模式)

direct把info、error、warning綁定級別把消息區分了,如果想做的更細致的區分,如在Linux上有一個系統日志,所有程序都在這個日志里面打日志。那如果我想知道什么是mysql的發出來的日志,什么是apache發出來的日志。然后mysql日志里面同時是info、warning、error。所以需要做更細的區分,更細致的消息過濾

1、生產者(topic_producer)

import pika
import sys
#創建socket實例,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()

#聲明exchange的名字和類型
channel.exchange_declare(exchange="topic_practice",
                         exchange_type="topic")
# 關鍵字,通過三元運算符,獲取外部傳遞的參數
keys = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "hello world"
#廣播一個消息
channel.basic_publish(
    exchange="topic_practice",
    routing_key=keys,
    body=message
)

print("send %s  %s" % (keys, message))

connect.close()

  

2、消費者(topic_consumers)

import pika
import sys
#創建socket實例,聲明管道
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()

#聲明exchange的名字和類型
channel.exchange_declare(exchange="topic_practice",
                         exchange_type="topic")
# 關鍵字,通過三元運算符,獲取外部傳遞的參數
keys = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "hello world"
#廣播一個消息
channel.basic_publish(
    exchange="topic_practice",
    routing_key=keys,
    body=message
)

print("send %s  %s" % (keys, message))

connect.close()

3、輸出

服務端:

C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 topic_producer.py mysql.info
send mysql.info  hello world

C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 topic_producer.py adapter.error
send adapter.error  hello world

客戶端:

C:\Users\dell\PycharmProjects\untitled\practice\rabbitmq_r>python3 topic_consumers.py mysql.* *.error
Waiting for msg
mysql.info  b'hello world'
adapter.error  b'hello world'

4、匹配規則

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM