RabbitMQ 消息隊列
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
通俗的講,就是生產者消費者模型。
實現生產者消費者模型的核心就是隊列!通過隊列去連接完成操作!
這個模型解決了耦合性,讓生產者和消費者之間沒有直接的聯系,而是通過隊列建立橋梁。這其中最重要的就是隊列。
一、隊列的使用:
1、基於Queue實現生產者消費者模型,python的隊列 queue。
q=queue.Queue() q.put() q.qsize() #隊列內消息個數 q.get() 先進先出 #!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
如上:python擁有自己的隊列模式,但是有一點不得不注意,他的隊列只能在同一進程內多線程間起作用,不能跨進程操作!
2、隊列的作用:
存儲消息、數據
保證消息的順序
保證數據的交付
3、消息隊列解決了兩個問題:
解耦 天然的解耦,程序間調用不再使用接口,而是調用消息隊列的接口把執行結果放到隊列中,實現解耦。【實際開發過程中,一定要想盡辦法降低程序間的耦合】
異步: 異步操作,程序不再等待執行結果,而是提供接收接口
優點:解決排隊問題
缺點:不能保證任務及時執行
應用場景:去哪兒購買飛機票,
同步:
優點:保證任務及時執行
缺點:排隊問題
4、有關大並發的事宜:
web網站:
之前部署:apache 1000 - 2000 一台機器同一時刻只能承載這么多請求!
常用部署:nginx 10000 - 20000
什么算大並發?有三個指標:
pv = page visit 頁面訪問量 【一天訪問量上億才算大網站】【具有分散性,高峰時間段訪問明顯】
一般,一億的pv用10台server web cluster 集群,pv分散到實際的用戶上並不多,推算到秒級別訪問量
uv = user visit 用戶訪問量【相對頁面訪問量很小】
qps = 每秒鍾的訪問流量 or 用戶量
對同一個請求的訪問,多個機器每個都負責一點,這就叫分布式運算。
5、引入rabbitmq的原因:
異步操作,應對大並發。
解決Python隊列不能跨進程執行的弊端。
對於RabbitMQ來說,生產和消費不再針對內存里的一個Queue對象,而是某台服務器上的RabbitMQ Server實現的消息隊列。
它作為一個獨立的組件,可以同時存在多個隊列【可以為任何程序提供隊列】,每個隊列對應不同的應用程序,隊列之間平行存在,相互獨立,不能混用!
與Python Queue隊列相比,消息不會直接放到rabbitmq隊列中,而是會先通過一次消息過濾,看他所屬哪個程序,然后放到對應的隊列中。
在不存在外力的影響下,RabbitMQ 隊列中的消息,如果不消費就永遠存在!只有消費之后會消失。
RabbitMQ 是一個公共的組件,能為多語言間提供隊列【例如:生產者是Java,消費者是Python】。
6、生產者消費者通過RabbitMQ隊列建立通信的步驟:
生產者:
端口 ip 認證信息
創建隊列
向隊列發送消息
消費者:
端口 ip 認證信息
從指定的隊列中取消息
7、RabbitMQ 配置 (Python連接隊列)
1.7.1、客戶端若想調用RabbitMQ,需要安裝對應的API。在python中安裝pika,通過pika連接rabbitmq
pip install pika
注意:若想使用、遠程連接rabbitmq server的話,就需要在RabbitMQ隊列這個組件內配置權限信息
8、rabbitmq 創建用戶和設置權限
1)首先在rabbitmq server上創建一個用戶:rabbitmqctl add_user aaa 密碼
2)配置權限,授權允許從外面訪問所有隊列:rabbitmqctl set_permissions -p / aaa "." "." ".*" #授權所有!

set_permissions [-p vhost] {user} {conf} {write} {read} vhost The name of the virtual host to which to grant the user access, defaulting to /. ,默認是 / user The name of the user to grant access to the specified virtual host. conf A regular expression matching resource names for which the user is granted configure permissions. write A regular expression matching resource names for which the user is granted write permissions. read A regular expression matching resource names for which the user is granted read permissions.
9、連接隊列時,需要建立通信,配置認證信息(生產者和消費者都必須與隊列建立聯系)
credentials = pika.PlainCredentials('aaa', '密碼') #配置認證信息
#建立鏈接 connection = pika.BlockingConnection(pika.ConnectionParameters('10.211.55.5',5672,'/',credentials)) channel = connection.channel() #隊列連接通道
10、生成者與消費者之間收發消息通信
生產者: 發送消息語法: #先聲明queue(沒有就創建,有就使用) channel.queue_declare(queue='task123',durable=True) #建立通信 channel.basic_publish(exchange='', #給負責消息過濾的方法傳遞參數 routing_key='task123', #路由 body='Hello World2!',消息內容 ) routing_key = 'task123' #路由 把消息隊列先傳給Exchange 然后再轉到task123隊列上 消費者: 獲取消息語法: def callback(ch,mothod,properties,body) #獲取消息執行的函數 參數解釋: ch:指隊列通道 method:請求方法 properties: 消息參數 body:消息內容 channel.basic_consume( callback, #取到消息之后,調用函數 callback queue="xxxxx", #隊列名稱 no_ack=True, ) #開始消費 channel.start_consuming() # 阻塞模式
注意: 一般申明隊列只需要在生產者端申明,但消費者端也可以申明。是防止如果生產者沒有啟動,消費者先啟動后沒有隊列會報錯的問題。此時服務端如果有相同代碼,會檢查如果有相同隊列就不創建。消費者再次申明隊列,目的是:消費者要清楚去哪里取數據!
二、RabbitMQ框架圖:
三、示例(默認RabbitMQ隊列已啟動)
1、實現簡單隊列建立通信

import pika credentials = pika.PlainCredentials('aaa', '123') # 配置認證的用戶 密碼 parameters = pika.ConnectionParameters(host="192.168.152.132", credentials=credentials) connection = pika.BlockingConnection(parameters) # 建立一個鏈接對象 channel = connection.channel() # 隊列連接通道 channel.queue_declare(queue='hello') # 聲明隊列queue 用rabbitmqctl list_queuse 查看 channel.basic_publish(exchange='', routing_key='hello', body='server hello world') # routing_key 路由代表要發送的隊列 body是發送的內容 print('server send "hello world"') connection.close() # 關閉連接 類似socket

#消費者是一種阻塞模式,會一致取數據 import pika credentials = pika.PlainCredentials('aaa', '123') # 配置認證的用戶 密碼 parameters = pika.ConnectionParameters(host="192.168.152.132", credentials=credentials) connection = pika.BlockingConnection(parameters) # 建立一個鏈接對象 channel = connection.channel() # 隊列連接通道 channel.queue_declare(queue='hello') # 聲明queue 用rabbitmqctl list_queuse 查看 def callback(ch, method, properties, body): print("Recived %r" % ch, method, properties, body) channel.basic_consume(callback, # 取到消息后,執行callback函數 queue='hello', # 從hello隊列獲取數據 no_ack=True ) print("waiting for message") channel.start_consuming() # 進入阻塞模式
2、消息持久化
如何保證隊列中的消息被完全處理完畢?我們正常的思維應該是:沒有處理完,應該返回隊列。但是在上面的代碼中,如果消費者客戶端掛了或者在處理的過程中停止了,不僅消息沒有處理完畢,同時隊列中也沒有了。
2.1、模擬客戶端中斷 觀察服務端隊列的數據會不會返回(答案:不會)
#- 開啟一個生產者,兩個消費者 #- 服務端向隊列中存放一個值,一客戶端從隊列中取到數據,在睡10秒期間中斷,表示出錯,它不會報告給服務端 #- 這時隊列中為零,另一客戶端也不會取到值
測試代碼如下:

#生產者 import pika credentials = pika.PlainCredentials("aaa","123") #授權的賬號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.152.132',credentials=credentials)) #建立socket channel = connection.channel() #創建rabbitmq協議通道 channel.queue_declare(queue='hello') #通過通道生成一個隊列 channel.basic_publish(exchange='', routing_key='hello', #隊列 body='Hello World!') #內容 print(" [x] Sent 'Hello World!'") connection.close()

#消費者: import pika import time credentials = pika.PlainCredentials("aaa","123") #授權的賬號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.152.132',credentials=credentials)) #建立socket channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print("received msg...start process",body) time.sleep(10) print("end process...") channel.basic_consume(callback, queue='hello', no_ack=True) print(' Waiting for messages. To exit press CTRL+C') channel.start_consuming()
#解決辦法: #0、發送消息時,在函數中添加以下參數,保證消息持久化 properties=pika.BasicProperties( delivery_mode=2, # make message persistent),# 數字代表狀態:2保持消息持久化;1處理中;0處理完畢; #1、消費確認的問題! #在消費者端,從隊列中獲取信息函數中有一個參數:no_ack = True 的意思是消息處理后,向rabbit-server確認消息已消費完畢。 刪除這個參數,不再確認信息已消費,rabbit-server的消息隊列中會一致存在數據 #2、解決消費后數據還存在問題! # 解決rabbit-server中消息被消費后數據還存在的情況,在消費者處理消息的函數中,使用ch.basic_ack(delivery_tag=method.delivery_tag)與生產者手動確認,消息處理完畢! #通過這兩個參數,同時保證了消費者能夠消費完數據不掛,同時消費完后rabbit-server收到消費完的消息把被消費的數據刪除 #1. 生產者端發消息時,加參數 消息持久化 properties=pika.BasicProperties( delivery_mode=2, # make message persistent), #2. 消費者端,消息處理完畢時,發送確認包 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, #取到消息后,調用callback 函數 queue='task1',) #no_ack=True) #消息處理后,不向rabbit-server確認消息已消費完畢
2.2、模擬測試 觀察服務端隊列的數據會不會返回(答案:會)
#- 開啟一個服務端,兩個客戶端 #- 服務端向隊列中存放一個值,一客戶端從隊列中取到數據,在睡20秒期間中斷,表示出錯,它會報給服務端,服務端隊列還有值 #- 這時啟動另一客戶端還可以取到值

#生產者 import pika credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 #聲明queue channel.queue_declare(queue='hello',durable=True) channel.basic_publish(exchange='', routing_key='task1', #路由 properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), #新加入參數 body='Hello World2!') print(" [x] Sent 'Hello World!'") connection.close()

#消費者 import pika import time credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 #聲明queue channel.queue_declare(queue='hello',durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) #time.sleep(10) print('msg handle done...',body) ch.basic_ack(delivery_tag=method.delivery_tag) #消息處理完畢,確認 channel.basic_consume(callback, #取到消息后,調用callback 函數 queue='task1',) #no_ack=True) #消息處理后,不向rabbit-server確認消息已消費完畢 print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #阻塞模式
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差不多。此時,先啟動消息生產者,然后再分別啟動3個消費者,通過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上。
3、隊列【及消息】持久化
當我們把rabbitmq-server重啟后,發現所有的消息就都丟失了?這種問題怎么辦?假如我們在某一個隊列中加入了上萬條消息,突然消息隊列重啟了。。。那是不是我們還得手動去添加消失的消息么?不用!以下是解決辦法:
1、生產者在聲明隊列的時候使用參數,保持隊列持久化 durable = True。
注意:一定是要在隊列第一次聲明的時候前添加,不能對已經生成的隊列重新再進行一次設置,否則會報錯【無法重新修改隊列】。
2、再通過參數delivery_mode = 2 把消息也變成持久化的。即便是rabbitmq服務重啟后,也不會丟消息
#隊列持久化 【僅設置單個】 channel.queue_declare(queue='hello',durable=True) systemctl restart rabbitmq-server #重啟服務發現hello隊列還在,但是消息不在 rabbitmqctl list_queues #查看消息隊列 #hello #隊列和消息持久化 【兩個參數都存在】 channel.queue_declare(queue='hello',durable=True) properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), systemctl restart rabbitmq-server #重啟服務發現隊列和消息都還在 rabbitmqctl list_queues #查看消息隊列 #hello 1

import pika credentials = pika.PlainCredentials("aaa","123") #授權的賬號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.152.132',credentials=credentials)) #建立socket channel = connection.channel() #創建rabbitmq協議通道 channel.queue_declare(queue='hello',durable=True) #通過通道生成一個隊列 channel.basic_publish(exchange='', routing_key='hello', #隊列 properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), body='Hello World!') #內容 print(" [x] Sent 'Hello World!'") connection.close()

#消費者 import pika import time credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 #聲明queue channel.queue_declare(queue='hello',durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) #time.sleep(10) print('msg handle done...',body) ch.basic_ack(delivery_tag=method.delivery_tag) #消息處理完畢,確認 channel.basic_consume(callback, #取到消息后,調用callback 函數 queue='task1',) #no_ack=True) #消息處理后,不向rabbit-server確認消息已消費完畢 print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #阻塞模式
4、多消費者間分發(消費者的公平分發)
如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
傳統模式:輪詢(排隊)獲取隊列中的數據,如果有一個消費者處理慢了,其他的消費者需要一直等待。那怎么解決並發的問題呢?別人處理快慢與本人處理的速度無關。
解決方案:以誰先處理完,誰就先獲得數據的原則【消息處理完畢才會再拿一條數據】。在消費者端加上這個條件判斷:channel.basic_qos(prefetch_count=1) # 公平分發,能者多勞,每次執行一個。
注意:生產者的代碼不變,消費者代碼中加入每次處理一次的參數:channel.basic_qos(prefetch_count=1) # 公平分發

import pika credentials = pika.PlainCredentials("aaa","123") #授權的賬號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.152.132',credentials=credentials)) #建立socket channel = connection.channel() #創建rabbitmq協議通道 channel.queue_declare(queue='hello',durable=True) #通過通道生成一個隊列 channel.basic_publish(exchange='', routing_key='hello', #隊列 properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), body='Hello World!') #內容 print(" [x] Sent 'Hello World!'") connection.close()

import pika import time credentials = pika.PlainCredentials('aaa', '123') # 配置認證的用戶 密碼 parameters = pika.ConnectionParameters(host="192.168.11.144", credentials=credentials) connection = pika.BlockingConnection(parameters) # 建立一個鏈接對象 channel = connection.channel() # 隊列連接通道 def callback(ch, method, properties, body): print("Recived %r" % ch, method, properties, body) time.sleep(10) print('msg handle done...',body) ch.basic_ack(delivery_tag=method.delivery_tag) # 這個是表示消費者處理完了 channel.basic_qos(prefetch_count=1) # 公平分發 channel.basic_consume(callback, # 取到消息后,執行callback函數 queue='hello', # no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 進入阻塞模式
5、消息訂閱發布 Publish\Subscribe(消息發布\訂閱) 消息過濾 exchange
廣播策略:每個人都能收到;或是過濾某些人可以接收
一個生產者,對應對個消費者!
exchange type 過濾類型
fanout = 廣播
direct = 組播
topic = 規則播
header = 略過。。。
之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout
headers: 通過headers 來決定把消息發給哪些queue
a、廣播模式(一個生產者,多個消費者):
1、路由指定為空!所有消息都發給exchange處理轉到隊列,轉到哪個隊列就需要exchange指定,所以在建立連接的時候要指定名字。
注意:exchange只負責轉發不負責存放消息!如果沒有隊列綁定消息就會扔掉!
2、自動生成隊列名,然后使用完之后再刪掉
隊列參數exclusive=True唯一的,rabbit 隨機生成一個名字。
3、生產者和消費者端都要聲明隊列,以排除生成者未啟動,消費者獲取報錯的問題
4、生產者發送一條消息,說有的消費者都能接收到!高效,效率的完成發送!
應用場景:新浪微博 訂閱模式,只有當前登錄的用戶才可以收到實時發送的消息

import pika import sys credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 channel.exchange_declare(exchange='logs',type='fanout') #聲明隊列 exchange名字和類型 message = ' '.join(sys.argv[1:]) or "info: Hello World!" #獲取外界輸入的信息,否則就是hello world channel.basic_publish(exchange='logs', #指定exchange的名字 routing_key='', #注意,不需要指定隊列名! body=message) #信息 print(" [x] Sent %r" % message) connection.close()

import pika credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 channel.exchange_declare(exchange='logs', type='fanout') queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 queue_name = queue_obj.method.queue #獲取隊列名 print('queue name',queue_name,queue_obj) #打印會列名 channel.queue_bind(exchange='logs',queue=queue_name) #綁定隊列到Exchange print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, no_ack=True) channel.start_consuming()
b、direct 組播模式:有選擇的接收消息(exchange type=direct)
1、有選擇的接收消息(exchange type=direct),RabbitMQ還支持根據關鍵字發送,相當於是添加了一個過濾地帶!
即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。
2、發什么類型的,什么類型的接收,在接收端運行的時候加參數,指定接收的類型。
3、routing_key = 'xxx' 與廣播相比不再為空,隊列由執行時手動輸入獲取,然后路由指定發送到哪個隊列。
4、按照類型:生產者發送指定類型的消息;消費者循環綁定隊列,如果不存在不接收
例子:就像廣播電台,要想接收生產者發送的數據,必須是綁定且在線!如果斷開一段時間再接收該電台消息,之前的訊息就不會再收到!
應用場景:日志分類處理邏輯 【注:可以同時存在多個消費者】

import pika import sys credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 channel.exchange_declare(exchange='direct_log',type='direct') #聲明消息隊列及類型 log_level = sys.argv[1] if len(sys.argv) > 1 else 'info' #日志等級 message = ' '.join(sys.argv[1:]) or "info: Hello World!" #接收手動輸入的消息內容 channel.basic_publish(exchange='direct_log', routing_key=log_level, body=message) print(" [x] Sent %r" % message) connection.close()

import pika,sys credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 queue_name = queue_obj.method.queue print('queue name',queue_name,queue_obj) log_levels = sys.argv[1:] #日志等級 info warning error danger #判斷存不存在,不存在退出 if not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) #循環綁定隊列 for level in log_levels: channel.queue_bind(exchange='direct_log', queue=queue_name, routing_key=level) #綁定隊列到Exchange print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, no_ack=True) channel.start_consuming()
c、topic規則播
話題類型,可以根據正則進行更精確的匹配,按照規則過濾。exchange type = topic,僅改下類型即可!
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
# 表示可以匹配 0 個 或 多個 單詞
* 表示只能匹配 一個 單詞
To receive all the logs run: python receive_logs_topic.py "#" To receive all logs from the facility "kern": python receive_logs_topic.py "kern.*" Or if you want to hear only about "critical" logs: python receive_logs_topic.py "*.critical" You can create multiple bindings: python receive_logs_topic.py "kern.*" "*.critical" And to emit a log with a routing key "kern.critical" type: python emit_log_topic.py "kern.critical" "A critical kernel error"
#測試執行如下: #客戶端一: - python3 receive1.py *.django #客戶端二: - python3 receive1.py mysql.error #客戶端三: - python3 receive1.py mysql.* #服務端: - python3 receive1.py #匹配相應的客戶端

import pika import sys credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 channel.exchange_declare(exchange='topic_log',type='topic') #log_level = sys.argv[1] if len(sys.argv) > 1 else 'info' log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info' message = ' '.join(sys.argv[1:]) or "all.info: Hello World!" channel.basic_publish(exchange='topic_log', routing_key=log_level, body=message) print(" [x] Sent %r" % message) connection.close()

import pika,sys credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 queue_name = queue_obj.method.queue log_levels = sys.argv[1:] # info warning errr if not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for level in log_levels: channel.queue_bind(exchange='topic_log', queue=queue_name, routing_key=level) #綁定隊列到Exchange print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, no_ack=True) channel.start_consuming()
6、RPC remote producer call 遠程執行調用
從上邊所有的例子中你有沒有發現,上面的隊列都是單向執行的,需要有發送端和接收端。如果遠程的一台機器執行完畢再返回結果,那就實現不了了。如果讓他執行完返回,這種模式叫什么呢?RPC(遠程過程調用),snmp就是典型的RPC。
那RabbitMQ能不能返回呢,怎么返回呢?可以讓機器既是發送端又是接收端。但是接收端返回消息怎么返回?可以發送到發過來的queue里么?答案當然是不可以,如果還是存在原先的隊列就會直接陷入死循環!所以返回時,需要讓消息內部指定再建立一個隊列queue,把結果發送新的queue里。
同時,為了服務端返回的queue不寫死,在客戶端給服務端發指令的的時候,同時帶一條消息說,你結果返回給哪個queue
在執行多個消息任務的時候,怎么區分判斷哪個消息是先執行呢?答案就是,在發任務時,再額外提交一個唯一標識符。
task1,task2異步執行,但是返回的順序是不固定的,為了區分是誰執行完的,在發送的任務添加唯一標識符,這樣取回的時候就能區分。
設置一個異步RPC
聲明一個隊列reply_to,作為返回消息結果的隊列
發送消息隊列,消息中帶唯一標識uid
監聽reply_to隊列,直到有結果
在類中聲明監聽

__author__ = 'Administrator' #1. 定義fib函數 #2. 聲明接收指令的隊列名rpc_queue #3. 開始監聽隊列,收到消息后 調用fib函數 #4. 把fib執行結果,發送回客戶端指定的reply_to 隊列 import subprocess import pika import time credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列連接通道 channel.queue_declare(queue='rpc_queue2') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def run_cmd(cmd): cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) result = cmd_obj.stdout.read() + cmd_obj.stderr.read() return result def on_request(ch, method, props, body): cmd = body.decode("utf-8") print(" [.] run (%s)" % cmd) response = run_cmd(cmd) ch.basic_publish(exchange='', routing_key=props.reply_to, #隊列 接收客戶端傳過來的隊列,返回 properties=pika.BasicProperties(correlation_id = props.correlation_id), body=response) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(on_request, queue='rpc_queue2') print(" [x] Awaiting RPC requests") channel.start_consuming()

# 1.聲明一個隊列,作為reply_to返回消息結果的隊列 # 2. 發消息到隊列,消息里帶一個唯一標識符uid,reply_to # 3. 監聽reply_to 的隊列,直到有結果 import queue import pika import uuid class CMDRpcClient(object): def __init__(self): credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue #命令的執行結果的queue #聲明要監聽callback_queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): """ 收到服務器端命令結果后執行這個函數 :param ch: :param method: :param props: 服務器端返回的消息結果! :param body: :return: """ if self.corr_id == props.correlation_id: self.response = body.decode("gbk") #把執行結果賦值給Response def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) #唯一標識符號 self.channel.basic_publish(exchange='', routing_key='rpc_queue2', properties=pika.BasicProperties( reply_to = self.callback_queue, #傳遞要返回的消息隊列 correlation_id = self.corr_id, #唯一id ), body=str(n)) #循環監聽 while self.response is None: self.connection.process_data_events() #檢測監聽的隊列里有沒有新消息,如果有,收,如果沒有,返回None #檢測有沒有要發送的新指令 return self.response cmd_rpc = CMDRpcClient() print(" [x] Requesting fib(30)") response = cmd_rpc.call('ipconfig') print(response)