目標:
代碼實現(direct_product.py)

1 # __author__ = 'STEVEN' 2 import pika,sys 3 #開啟socket 4 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 5 #開啟一個通道 6 channel = connection.channel() 7 #這里不用再創建隊列 8 channel.exchange_declare(exchange='d_logs',exchange_type='direct') 9 #通過命令行運行本代碼,指定要發送的消息級別,如果不指定則為info 10 severity = sys.argv[1] if len(sys.argv)>1 else 'info' 11 #通過命令行輸入消息級別后的參數 12 mes = ''.join(sys.argv[2:]) or 'hello world' 13 #添加了消息級別,即將消息發布到指定了級別的消息隊列 14 channel.basic_publish(exchange='d_logs',routing_key=severity,body=mes) 15 print('[x] send the mes%s to queue'%mes) 16 connection.close()
代碼實現(direct_consumer.py)

1 # __author__ = 'STEVEN' 2 import pika,sys 3 #建立socket 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 #開啟通道 6 channel = connection.channel() 7 #通道先聲明exchange類型 8 channel.exchange_declare(exchange='d_logs',exchange_type='direct') 9 #聲明queue 10 result = channel.queue_declare(exclusive=True) 11 #獲取queue_name 12 queue_name = result.method.queue 13 #從sys中獲取消息級別等信息 14 severitis = sys.argv[1:] 15 if not severitis: 16 sys.stderr.write('usage:%s [info] [error] [warning]'%sys.argv[0]) 17 sys.exit(1) 18 #遍歷綁定各個exchange和queue以及消息級別 19 for severity in severitis: 20 #綁定queue 21 channel.queue_bind(exchange='d_logs',routing_key=severity,queue=queue_name) 22 #回調函數 23 def callback(ch,method,properties,body): 24 print('[x] receive mess :%s'%body.decode()) 25 #指定消費相關參數 26 channel.basic_consume(callback,queue=queue_name,no_ack=True) 27 print('[*] is waiting for the message') 28 #開啟消費 29 channel.start_consuming()
運行注意:
先開啟消費端,用命令行指定接受級別