Python-RabbitMQ消息隊列的發布與訂閱


RabbitMQ消息隊列的發布與訂閱類似於廣播,一端發送消息,多個客戶端可以同時接收到消息

fanout:所有綁定到exchange的queue都可以接收消息

 消息發布端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

channel.exchange_declare(exchange='fanout_logs', exchange_type='fanout')

message = 'Hello World!'
channel.basic_publish(exchange='fanout_logs',
                      routing_key='',
                      body=message)

print('發送消息:', message)
connection.close()

 消息訂閱端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

channel.exchange_declare(exchange='fanout_logs', exchange_type='fanout')

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbitmq會隨機分配一個名字
# exclusive=True會在使用此queue的消息訂閱端斷開后,自動將queue刪除

queue_name = result.method.queue
print('當前queue名稱:', queue_name)

channel.queue_bind(exchange='fanout_logs', queue=queue_name)


def callback(ch, method, properties, body):
    print(ch, method, properties)
    print('收到數據:', body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True,
                      )

print('開始等待消息')
channel.start_consuming()

 消息發布端需要在消息訂閱端運行之后運行,不然消息訂閱端收不到消息

開啟3個消息訂閱端和一個消息發布端

消息發布端發布消息

3個消息訂閱端會同時接收到消息

direct:指定的queue才能收到消息

把queue綁定關鍵字,消息發布端根據關鍵字將消息發送到exchange,exchange根據關鍵字將消息發送到指定隊列

消息發布端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'  # 關鍵字
message = 'Hello World!'

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print('級別 [%s] 發送數據 [%s]' % (severity, message))
connection.close()

消息訂閱端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]  # 關鍵字

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)


def callback(ch, method, properties, body):
    print(ch, method, properties)
    print('收到來自%s的消息:%s' % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

print('開始等待消息')
channel.start_consuming()

 開啟3個消息訂閱端,分別引用關鍵字info、warning,warning、error,error、info,開啟一個消息發布端,引用關鍵字wanrning

只有引用關鍵字warning的消息訂閱端才收到消息

沒有引用關鍵字warning的就收不到消息

topic:符合條件的queue才能收到消息

消息訂閱端指定條件,符合條件的queue才能被消息訂閱端收到消息

#會匹配任意個任意字符串,*會匹配一個任意字符串

單個#會匹配全部字符串,單個*無法匹配

消息發布端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = 'Hello World!'

channel.basic_publish(exchange='topic_logs',
                      routing_key=severity,
                      body=message)

print('給 [%s] 發送消息 [%s]' % (severity, message))
connection.close()

 消息訂閱端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

keys = sys.argv[1:]

for key in keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=key)


def callback(ch, method, properties, body):
    print(ch, method, properties)
    print('來自 [%s] 的信息 [%s]' % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
print('開始接收消息')
channel.start_consuming()

 開啟3個消息訂閱端和一個消息發布端

3個消息訂閱端,一個*.exe匹配以.exe結尾的,一個#匹配全部字符串,一個python.exe只匹配字符串為python.exe的

消息發布端引用關鍵字qq.exe

很顯然,匹配以.exe結尾和匹配全部字符串的消息訂閱端將會收到消息,只匹配字符串為python.exe的不會收到消息

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM