前言
使用場景:
我們希望接收端指定接收某個隊列的消息的時候,此時為直連交換類型
原理:
每個接收端在綁定交換機的時候可以設置相應路由鍵,每個發送端在發送消息的時候可以指明路由鍵,交換機可以根據路由鍵將數據發送到指定的隊列中,這樣接收端就能從指定的隊列獲取到相應的數據
代碼
send
import pika hostname = '127.0.0.1' parameters = pika.ConnectionParameters(hostname) connection = pika.BlockingConnection(parameters) # 創建通道 channel = connection.channel() # 定義交換機,設置類型為direct channel.exchange_declare(exchange='change_dir', exchange_type='direct') # 定義三個路由鍵 routings = ['info', 'warning', 'error'] # 將消息依次發送到交換機,並設置路由鍵 for routing in routings: message = '%s message.' % routing channel.basic_publish(exchange='change_dir', routing_key=routing, body=message) print(message) connection.close()
receive
import sys, pika hostname = '127.0.0.1' parameters = pika.ConnectionParameters(hostname) connection = pika.BlockingConnection(parameters) # 創建通道 channel = connection.channel() # 定義交換機,設置類型為direct channel.exchange_declare(exchange='change_dir', exchange_type='direct') # 從命令行獲取路由鍵參數,如果沒有,則設置為info routings = sys.argv[1:] if not routings: routings = ['info'] # 生成臨時隊列, result = channel.queue_declare(queue='change_dir', exclusive=True) # exclusive=True 當接收端退出的時候 會銷毀那個臨時創建的隊列 queue_name = result.method.queue print(queue_name) for routing in routings: # 綁定到交換機上,設置路由鍵 channel.queue_bind(exchange='change_dir', queue=queue_name, routing_key=routing) def callback(ch, method, properties, body): print(" [x] Received %r" % (body,)) channel.basic_consume( queue='change_dir', # 指定隊列名 on_message_callback=callback, # 從隊列里獲取消息 auto_ack=False # mq服務器掛掉 防止任務丟失 ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
演示
開啟一個發送端一個接收端並且讓發送端發送數據

接收端

發送者發送了三個數據 但是其只能接收一個數據 屬於自己路由匹配的
