前言:這次整理寫一篇關於rabbitMQ的博客,相比上一篇redis,感覺rabbitMQ難度是提高不少。這篇博客會插入一些英文講解,不過不難理解的。rabbitMQ的下載與安裝,請參考redis&rabbitMQ安裝。
rabbitMQ是消息隊列;想想之前的我們學過隊列queue:threading queue(線程queue,多個線程之間進行數據交互)、進程Queue(父進程與子進程進行交互或者同屬於同一父進程下的多個子進程進行交互);如果兩個獨立的程序,那么之間是不能通過queue進行交互的,這時候我們就需要一個中間代理即rabbitMQ.
一、簡單的rabbitMQ隊列通信
由上圖可知,數據是先發給exchange交換器,exchage再發給相應隊列。pika模塊是python對rabbitMQ的API接口。接收端有一個回調函數,一接收到數據就調用該函數。一條消息被一個消費者接收后,該消息就從隊列刪除。OK,了解上面的知識后,先來看看一個簡單的rabbitMQ列隊通信。
send端:
1 import pika 2 #連上rabbitMQ 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost')) 4 channel=connection.channel() #生成管道,在管道里跑不同的隊列 5 6 #聲明queue 7 channel.queue_declare(queue='hello1') 8 9 #n RabbitMQ a message can never be sent directly to the queue,it always needs to go through an exchange. 10 #向隊列里發數據 11 channel.basic_publish(exchange='', #先把數據發給exchange交換器,exchage再發給相應隊列 12 routing_key='hello1', #向"hello'隊列發數據 13 body='HelloWorld!!') #發的消息 14 print("[x]Sent'HelloWorld!'") 15 connection.close()
receive端:
1 import pika 2 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost')) 4 channel=connection.channel() 5 6 # You may ask why we declare the queue again ‒ we have already declared it in our previous code. 7 # We could avoid that if we were sure that the queue already exists. For example if send.py program 8 # was run before. But we're not yet sure which program to run first. In such cases it's a good 9 # practice to repeat declaring the queue in both programs. 10 channel.queue_declare(queue='hello1')#聲明隊列,保證程序不出錯 11 12 13 def callback(ch,method,properties,body): 14 print("-->ch",ch) 15 print("-->method",method) 16 print("-->properties",properties) 17 print("[x] Received %r" % body) #一條消息被一個消費者接收后,該消息就從隊列刪除 18 19 20 channel.basic_consume(callback, #回調函數,一接收到消息就調用回調函數 21 queue='hello1', 22 no_ack=False) #消費完畢后向服務端發送一個確認,默認為False 23 24 print('[*] Waiting for messages.To exit press CTRL+C') 25 channel.start_consuming()
運行結果:(上面的代碼對應我寫的注釋相信是看得懂的~)

rabbitMQ_1_send.py [x] Sent 'Hello World!' rabbitMQ_2_receive.py [*] Waiting for messages. To exit press CTRL+C -->ch <pika.adapters.blocking_connection.BlockingChannel object at 0x000000000250AEB8> -->method <Basic.Deliver(['consumer_tag=ctag1.f9533f4c8c59473c8096817670ad69d6', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello1'])> -->properties <BasicProperties> [x] Received b'Hello World!!'
經過深入的測試,有以下兩個發現:
- 先運行rabbitMQ_1_send.py發送數據,rabbitMQ_2_receive.py未運行。發現當receive運行時仍能接收數據。
- 運行多個(eg:3個)接收數據的客戶端,再運行發送端,客戶端1收到數據,再運行發送端,客戶端2收到數據,再運行發送端,客戶端3收到數據。
RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差不多。
二、全英文ack
在看上面的例子,你會發現有一句代碼no_ack=False(消費完畢后向服務端發送一個確認,默認為False),以我英語四級飄過的水平,看完下面關於ack的講解感覺寫得很牛啊!!於是分享一下:
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.
Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.
Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.
我把發送端和接收端分別比作生產者與消費者。生產者發送任務A,消費者接收任務A並處理,處理完后生產者將消息隊列中的任務A刪除。現在我們遇到了一個問題:如果消費者接收任務A,但在處理的過程中突然宕機了。而此時生產者將消息隊列中的任務A刪除。實際上任務A並未成功處理完,相當於丟失了任務/消息。為解決這個問題,應使消費者接收任務並成功處理完后發送一個ack到生產者!生產者收到ack后就明白任務A已被成功處理,這時才從消息隊列中將任務A刪除,如果沒有收到ack,就需要把任務A發送給下一個消費者,直到任務A被成功處理。
三、消息持久化
前面已經知道,生產者生產數據,消費者再啟動是可以接收數據的。
但是,生產者生產數據,然后重啟rabbitMQ,消費者是無法接收數據。
eg:消息在傳輸過程中rabbitMQ服務器宕機了,會發現之前的消息隊列就不存在了,這時我們就要用到消息持久化,消息持久化會讓隊列不隨着服務器宕機而消失,會永久的保存下去。下面看下關於消息持久化的英文講解:
We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.
First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:
1 channel.queue_declare(queue='hello', durable=True)
Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error(會曝錯) to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:
1 channel.queue_declare(queue='task_queue', durable=True)
This queue_declare change needs to be applied to both the producer and consumer code.
At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.
1 channel.basic_publish(exchange='',
2 routing_key="task_queue",
3 body=message,
4 properties=pika.BasicProperties(
5 delivery_mode = 2, # make message persistent
6 ))
上面的英文對消息持久化講得很好。消息持久化分為兩步:
- 持久化隊列。通過代碼實現持久化hello隊列:channel.queue_declare(queue='hello', durable=True)
- 持久化隊列中的消息。通過代碼實現:properties=pika.BasicProperties( delivery_mode = 2, )
這里有個點要注意下:
如果你在代碼中已實現持久化hello隊列與隊列中的消息。那么你重啟rabbitMQ后再次運行代碼可能會爆錯!
因為: RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error.
為了解決這個問題,可以聲明一個與重啟rabbitMQ之前不同的隊列名(queue_name).
四、消息公平分發
如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
帶消息持久化+公平分發的完整代碼
生產者端:

1 import pika 2 import sys 3 4 connection =pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.queue_declare(queue='task_queue', durable=True) #隊列持久化 9 10 message = ' '.join(sys.argv[1:]) or"Hello World!" 11 channel.basic_publish(exchange='', 12 routing_key='task_queue', 13 body=message, 14 properties=pika.BasicProperties( 15 delivery_mode = 2, # make message persistent消息持久化 16 )) 17 print(" [x] Sent %r" % message) 18 connection.close()
消費者端:

1 #!/usr/bin/env python 2 import pika 3 import time 4 5 connection =pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.queue_declare(queue='task_queue', durable=True) 10 print(' [*] Waiting for messages. To exit press CTRL+C') 11 12 def callback(ch, method, properties, body): 13 print(" [x] Received %r" % body) 14 time.sleep(body.count(b'.')) 15 print(" [x] Done") 16 ch.basic_ack(delivery_tag =method.delivery_tag) 17 18 channel.basic_qos(prefetch_count=1) 19 channel.basic_consume(callback, 20 queue='task_queue') 21 22 channel.start_consuming()
我在運行上面程序時對消費者端里回調函數的一句代碼(ch.basic_ack(delivery_tag =method.delivery_tag))十分困惑。這句代碼去掉消費者端也能照樣收到消息啊。這句代碼有毛線用處??
生產者端消息持久后,需要在消費者端加上(ch.basic_ack(delivery_tag =method.delivery_tag)): 保證消息被消費后,消費端發送一個ack,然后服務端從隊列刪除該消息.
五、消息發布與訂閱
之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的queue收到,類似廣播的效果,這時候就要用到exchange了。PS:有興趣的了解redis的發布與訂閱,可以看看我寫的博客python之redis。
An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded(丟棄). The rules for that are defined by the exchange type.
Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
表達式符號說明: #代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout
下面我分別講下fanout,direct,topic:
1、fanout
fanout: 所有bind到此exchange的queue都可以接收消息
send端:

1 import pika 2 import sys 3 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel=connection.channel() 6 7 channel.exchange_declare(exchange='logs', 8 type='fanout') 9 10 message=''.join(sys.argv[1:])or"info:HelloWorld!" 11 channel.basic_publish(exchange='logs', 12 routing_key='', #fanout的話為空(默認) 13 body=message) 14 print("[x]Sent%r"%message) 15 connection.close()
receive端:

1 import pika 2 3 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 4 channel=connection.channel() 5 6 channel.exchange_declare(exchange='logs',type='fanout') 7 8 #不指定queue名字(為了收廣播),rabbit會隨機分配一個queue名字, 9 #exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 10 result=channel.queue_declare(exclusive=True) 11 queue_name=result.method.queue 12 13 #把聲明的queue綁定到交換器exchange上 14 channel.queue_bind(exchange='logs', 15 queue=queue_name) 16 17 print('[*]Waitingforlogs.ToexitpressCTRL+C') 18 19 def callback(ch,method,properties,body): 20 print("[x]%r"%body) 21 22 23 channel.basic_consume(callback, 24 queue=queue_name, 25 no_ack=True) 26 27 channel.start_consuming()
有兩個點要注意下:
- fanout-廣播,send端的routing_key='', #fanout的話為空(默認)
- receive端有一句代碼:result=channel.queue_declare(exclusive=True),作用:不指定queue名字(為了收廣播),rabbitMQ會隨機分配一個queue名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除。
2、有選擇的接收消息(exchange type=direct)
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。
send端:

1 import pika 2 import sys 3 4 connection =pika.BlockingConnection(pika.ConnectionParameters( 5 host='localh'))ost 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs', 9 type='direct') 10 11 severity = sys.argv[1] iflen(sys.argv) > 1 else 'info' 12 message = ' '.join(sys.argv[2:]) or'Hello World!' 13 channel.basic_publish(exchange='direct_logs', 14 routing_key=severity, #關鍵字不為空,告知消息發送到哪里(info,error~) 15 body=message) 16 print(" [x] Sent %r:%r" % (severity, message)) 17 connection.close()
receive端:

1 import pika 2 import sys 3 4 connection =pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs', 9 type='direct') 10 11 result =channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 severities = sys.argv[1:] 15 if not severities: 16 sys.stderr.write("Usage: %s [info] [warning] [error]\n" %sys.argv[0]) 17 sys.exit(1) 18 19 for severity in severities: 20 channel.queue_bind(exchange='direct_logs', 21 queue=queue_name, 22 routing_key=severity) 23 24 print(' [*] Waiting for logs. To exit press CTRL+C') 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" %(method.routing_key, body)) 28 29 channel.basic_consume(callback, 30 queue=queue_name, 31 no_ack=True) 32 33 channel.start_consuming()
其實最開始我看代碼是一臉懵逼的~ 下面是我在cmd進行測試的截圖(配合着截圖看會容易理解些),一個send端,兩個receive端(先起receive端,再起receive端):
send端:
receive端-1:
receive端-2:
3、更細致的消息過濾topic(供參考)
Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.
In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).
That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.
感覺我英文水平不高啊~,我對照着垃圾有道翻譯,加上自己的理解,大概知道上面在講什么。
舉例: 如果是系統的錯誤,就把信息發送到A,如果是MySQL的錯誤,就把信息發送到B。但是對B來說,想實現接收MySQL的錯誤信息,可以用有選擇的接收消息(exchange type=direct),讓關鍵字為error就實現了啊!現在B有個需求:不是所有的錯誤信息都接收,只接收指定的錯誤。在某種信息再進行過濾,這就是更細致的消息過濾topic。
send端:

1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='topic_logs', 9 type='topic') #類型為topic 10 11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 12 message = ' '.join(sys.argv[2:]) or 'Hello World!' 13 channel.basic_publish(exchange='topic_logs', 14 routing_key=routing_key, 15 body=message) 16 print(" [x] Sent %r:%r" % (routing_key, message)) 17 connection.close()
receive端:

1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='topic_logs', 9 type='topic') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 binding_keys = sys.argv[1:] 15 if not binding_keys: 16 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 17 sys.exit(1) 18 19 for binding_key in binding_keys: 20 channel.queue_bind(exchange='topic_logs', 21 queue=queue_name, 22 routing_key=binding_key) 23 24 print(' [*] Waiting for logs. To exit press CTRL+C') 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" % (method.routing_key, body)) 28 29 channel.basic_consume(callback, 30 queue=queue_name, 31 no_ack=True) 32 33 channel.start_consuming()
六、RPC(Remote Procedure Call)
RPC的概念可看我百度的(其實就類似我之前做的FTP,我從客戶端發一個指令,服務端返回相關信息):

RPC采用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,然后等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息的到達為止。當一個調用信息到達,服務器獲得進程參數,計算結果,發送答復信息,然后等待下一個調用信息,最后,客戶端調用進程接收答復信息,獲得進程結果,然后調用執行繼續進行。
下面重點講下RPC通信,我剛開始學挺難的,學完之后感覺RPC通信的思想很有啟發性,代碼的例子寫得也很牛!!
client端發的消息被server端接收后,server端會調用callback函數,執行任務后,還需要把相應的信息發送到client,但是server如何將信息發還給client?如果有多個client連接server,server又怎么知道是要發給哪個client??
RPC-server默認監聽rpc_queue.肯定不能把要發給client端的信息發到rpc_queue吧(rpc_queue是監聽client端發到server端的數據)。
合理的方案是server端另起一個queue,通過queue將信息返回給對應client。但問題又來了,queue是server端起的,故client端肯定不知道queue_name,連queue_name都不知道,client端接收毛線的數據??
解決方法:
客戶端在發送指令的同時告訴服務端:任務執行完后,數據通過某隊列返回結果。客戶端監聽該隊列就OK了。
client端:
1 import pika 2 import uuid 3
4
5 class FibonacciRpcClient(object): 6 def __init__(self): 7 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 8
9 self.channel = self.connection.channel() 10 #隨機建立一個queue,為了監聽返回的結果
11 result = self.channel.queue_declare(exclusive=True) 12 self.callback_queue = result.method.queue ##隊列名
13
14 self.channel.basic_consume(self.on_response, #一接收客戶端發來的指令就調用回調函數on_response
15 no_ack=True, 16 queue=self.callback_queue) 17
18 def on_response(self, ch, method, props, body): #回調
19 #每條指令執行的速度可能不一樣,指令1比指令2先發送,但可能指令2的執行結果比指令1先返回到客戶端,
20 #此時如果沒有下面的判斷,客戶端就會把指令2的結果誤認為指令1執行的結果
21 if self.corr_id == props.correlation_id: 22 self.response = body 23
24 def call(self, n): 25 self.response = None ##指令執行后返回的消息
26 self.corr_id = str(uuid.uuid4()) ##可用來標識指令(順序)
27 self.channel.basic_publish(exchange='', 28 routing_key='rpc_queue', #client發送指令,發到rpc_queue
29 properties=pika.BasicProperties( 30 reply_to=self.callback_queue, #將指令執行結果返回到reply_to隊列
31 correlation_id=self.corr_id, 32 ), 33 body=str(n)) 34 while self.response is None: 35 self.connection.process_data_events() #去queue接收數據(不阻塞)
36 return int(self.response) 37
38
39 fibonacci_rpc = FibonacciRpcClient() 40
41 print(" [x] Requesting fib(30)") 42 response = fibonacci_rpc.call(30) 43 print(" [.] Got %r" % response)
server端:
1 import pika 2 import time 3
4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6
7 channel = connection.channel() 8
9 channel.queue_declare(queue='rpc_queue') 10
11
12 def fib(n): 13 if n == 0: 14 return 0 15 elif n == 1: 16 return 1
17 else: 18 return fib(n - 1) + fib(n - 2) 19
20
21 def on_request(ch, method, props, body): 22 n = int(body) 23
24 print(" [.] fib(%s)" % n) 25 response = fib(n) #從客戶端收到的消息
26
27 ch.basic_publish(exchange='', ##服務端發送返回的數據到props.reply_to隊列(客戶端發送指令時聲明)
28 routing_key=props.reply_to, #correlation_id (隨機數)每條指令都有隨機獨立的標識符
29 properties=pika.BasicProperties(correlation_id= \ 30 props.correlation_id), 31 body=str(response)) 32 ch.basic_ack(delivery_tag=method.delivery_tag) #客戶端持久化
33
34
35 channel.basic_qos(prefetch_count=1) #公平分發
36 channel.basic_consume(on_request, #一接收到消息就調用on_request
37 queue='rpc_queue') 38
39 print(" [x] Awaiting RPC requests") 40 channel.start_consuming()