Python有多種插件都支持RabbitMQ,本文介紹的是RabbitMQ推薦的Pika插件。使用pip直接安裝即可 pip install pika
。
一、RabbitMQ簡介
1. MQ簡介
MQ(Message Queue,消息隊列),是一個在消息傳輸過程中保存消息的容器,多用在分布式系統之間進行通信。
MQ優勢
- 應用解耦:提高系統容錯性和可維護性。
- 異步提速:提升用戶體驗和系統吞吐量,MQ可短時間接收和保存大量消息(請求),其他服務可以異步地進行消息的消費。
- 削峰填谷:提高系統穩定性,當MQ中保存了大量消息(請求)后,其他服務就可以按照自身的需要從容地對MQ中的消息進行消費,而不必直接去處理大量請求(短時間內的大量請求在橫軸為時間、縱軸為請求量的圖上就是峰頂)。
MQ劣勢
- 如果使用MQ的話,MQ就屬於系統引入的外部依賴,一旦MQ宕機,就會對業務造成影響。
2. RabbitMQ簡介
AMQP(Advanced Message Queuing Protocol,高級消息隊列協議),是一個網絡協議,同時也是一個應用協議的開放標准,專為面向消息的中間件而設計。RabbitMQ是基於AMQP並使用Erlang語言開發的消息中間件,在安裝RabbitMQ時如果沒有安裝Erlang的話,需要根據提示下載並安裝Erlang。當前有多種語言都有對應的插件來支持RabbitMQ的使用,包括Java、Python、Ruby等,本文主要介紹通過Python操作RabbitMQ。
RabbitMQ相關概念
- Broker:接收和分發消息的應用,RabbitMQ Server就是Message Broker。
- Virtual host:出於多租戶和安全因素的設計,把AMQP的基本組件划分到一個虛擬的分組中,類似於網絡中的namespace概念,當多個不同的用戶使用同一個RabbitMQ Server提供的服務時,可以划分出多個vhost,每個用戶在自己的vhost創建exchange/queue等。
- Connection:publisher/consumer和broker之間的TCP連接。
- Channel:如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷都將是巨大的,效率也是非常低的。Channel是在Connection內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread會創建單獨的Channel進行通信,AMQP的method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection,極大減少了操作系統建立TCP連接的開銷。
相關術語
- producer:生產者,向隊列中發送消息的程序。(在圖表中通常使用P表示)
- queue:隊列,用於存儲消息,定義在RabbitMQ內部,queue本質上是一個消息緩存buffer,生產者可以往里發送消息,消費者也可以從里面獲取消息。(在圖表中通常使用Q表示)
- consumer:消費者,等待並從消息隊列中獲取消息的程序。(在圖表中通常使用C表示)
- exchange:交換機,用於將producer發送來的消息發送到queue,事實上,producer是不能直接將message發送到queue,必須先發送到exchange,再由exchange發送到queue。(在圖表中通常使用X表示)
- 注:生產者和消費者可能在不同的程序或主機中,當然也有可能一個程序有可能既是生產者,也是消費者。
Windows上RabbitMQ安裝
在 https://rabbitmq.com/install-windows.html 上找到 Direct Downloads
下的exe安裝文件並下載即可,安裝過程中可能會提示你下載Erlang,按提示打開網站下載安裝即可 https://www.erlang.org/downloads ,下載文件如 OTP 24.0 Windows 64-bit Binary File
。 都安裝好后執行以下命令創建用戶:
- 在命令窗口cd到RabbitMQ安裝目錄的
RabbitMQ Server\rabbitmq_server-3.8.17\sbin
。 - 安裝RabbitMQ網頁插件:
rabbitmq-plugins.bat enable rabbitmq_management
。 - 新建用戶:
rabbitmqctl.bat add_user admin 123456
。(此處用戶名和密碼自己設置即可) - 設置管理員:
rabbitmqctl.bat set_user_tags admin administrator
。 (將剛才新建的用戶設置為管理員身份) - 設置用戶權限:
rabbitmqctl.bat set_permissions -p / dj123 “." ".” “.*”
。 - 停止和啟動RabbitMQ服務:
net stop RabbitMQ && net start RabbitMQ
。(安裝RabbitMQ后會自動啟動服務,所以這一步也可以不用執行) - 使用網頁插件查看RabbitMQ相關信息:直接訪問
http://localhost:15672/
,並輸入剛才創建的用戶和密碼即可。(這一步正常訪問網頁,則表示RabbitMQ安裝成功了) - 注:RabbitMQ安裝成功后,就可以使用Python程序(或其他語言程序)通過RabbitMQ服務發送和接收消息了。
二、RabbitMQ六種模式
此部分內容為 https://www.rabbitmq.com/getstarted.html 的筆記,都是關於RabbitMQ的基礎使用,沒有涉及太深的原理和參數使用,想了解更多的話也可以去官網看看,或者直接網上搜一搜。另外,運行本文中的示例代碼時,請務必先安裝好RabbitMQ服務和Pika插件。
六種模式分別為Hello world、Work queues(工作隊列)、Publish/Subscribe(發布訂閱)、Routing(路由)、Topics(主題)、RPC(遠程調用),處了RPC模式外,其余的模式都是從簡單的使用到更為靈活的使用,其實從示例代碼就可以看出,基本的代碼框架都是差不多的,只是在不同的模式下達到的效果不同,它們各有各的特點,在實際使用中應該根據需求來選擇具體的模式,而不是簡單粗暴的選擇最“高端”的模式。
1. Hello world模式
Hello world模式是最簡單的一種模式,一個producer發送message,另一個consumer接收message。
producer示例 send.py
:producer端發送message會涉及最簡單的5個步驟,具體見代碼注釋。
import pika
# 1. 創建一個到RabbitMQ server的連接,如果連接的不是本機,
# 則在pika.ConnectionParameters中傳入具體的ip和port即可
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost'))
# 2. 創建一個channel
channel = connection.channel()
# 3. 創建隊列,queue_declare可以使用任意次數,
# 如果指定的queue不存在,則會創建一個queue,如果已經存在,
# 則不會做其他動作,官方推薦,每次使用時都可以加上這句
channel.queue_declare(queue='hello')
# 4. 發布消息
channel.basic_publish(
exchange='', # RabbitMQ中所有的消息都要先通過交換機,空字符串表示使用默認的交換機
routing_key='hello', # 指定消息要發送到哪個queue
body='Hello world!') # 消息的內容
# 5. 關閉連接
connection.close()
consumer示例 receive.py
:consumer端接收message會涉及最簡單的6個步驟,具體見代碼注釋。
import pika
def main():
# 1. 創建一個到RabbitMQ server的連接,如果連接的不是本機,
# 則在pika.ConnectionParameters中傳入具體的ip和port即可
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost'))
# 2. 創建一個channel
channel = connection.channel()
# 3. 創建隊列,queue_declare可以使用任意次數,
# 如果指定的queue不存在,則會創建一個queue,如果已經存在,
# 則不會做其他動作,官方推薦,每次使用時都可以加上這句
channel.queue_declare(queue='hello')
# 4. 定義消息處理程序
def callback(ch, method, properties, body):
print('[x] Received %r' % body)
# 5. 接收來自指定queue的消息
channel.basic_consume(
queue='hello', # 接收指定queue的消息
on_message_callback=callback, # 接收到消息后的處理程序
auto_ack=True) # 指定為True,表示消息接收到后自動給消息發送方回復確認,已收到消息
print('[*] Waiting for message.')
# 6. 開始循環等待,一直處於等待接收消息的狀態
channel.start_consuming()
if __name__ == '__main__':
main()
注:示例代碼來自 https://www.rabbitmq.com/tutorials/tutorial-one-python.html
2. Work queues模式
Work queues模式即工作隊列模式,也稱為Task queues模式(任務隊列模式),這個模式的特點在於,同一個queue可以允許多個consumer從中獲取massage,RabbitMQ默認會從queue中依次循環的給不同的consumer發送message。與Hello world模式相比,工作隊列模式在示例代碼中有以下不同:
- hello world模式中指定了
auto_ack=True
,表示consumer接收到message之后自動發送確認標識,告訴RabbitMQ可以從隊列中移除該條message了。工作隊列模式下,使用了默認值,即需要手動確認ch.basic_ack(delivery_tag=method.delivery_tag)
。 - hello world模式中只有一個consumer去處理queue中的message,工作隊列模式中可以有多個consumer去處理queue中的message。
- 工作隊列模式中可以使message持久化,保證RabbitMQ服務掛掉之后message依然不被丟失。
- 工作隊列模式中可以手動標記message已接收並處理完成(這一步在編程時千萬別忘了,否則RabbitMQ會認為該條message沒有被處理,會一直保留在隊列中,並適時發送到別的consumer中)。
producer示例 new_task.py
:注意如果聲明queue時參數不一樣,則建議換一個名稱,因為RabbitMQ中不允許同名但實際上是不同的兩個queue存在,比如指定了 durable=True
參數。
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 聲明durable=True可以保證RabbitMQ服務掛掉之后隊列中的消息也不丟失,原理是因為
# RabbitMQ會將queue中的消息保存到磁盤中
channel.queue_declare(queue='task_queue')
message = 'Hello World! 555'
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
# delivery_mode=2可以指定此條消息持久化,防止RabbitMQ服務掛掉之后消息丟失
# 但是此屬性設置並不能百分百保證消息真的被持久化,因為RabbitMQ掛掉的時候
# 它可能還保存在緩存中,沒來得及同步到磁盤中
# properties=pika.BasicProperties(delivery_mode=2)
)
print(" [x] Sent %r" % message)
connection.close()
consumer示例 worker.py
:RabbitMQ會將queue中的消息依次發送給不同的consumer,所以這里的示例可以用同樣的代碼多開幾個客戶端進行測試。
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 聲明durable=True可以保證RabbitMQ服務掛掉之后隊列中的消息也不丟失,原理是因為
# RabbitMQ會將queue中的消息保存到磁盤中
channel.queue_declare(queue='task_queue')
print(' [*] Waiting for messages.')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
# 此處以消息中的“.”的數量作為sleep的值,是為了模擬不同消息處理的耗時
time.sleep(body.count(b'.'))
print(" [x] Done")
# 手動標記消息已接收並處理完畢,RabbitMQ可以從queue中移除該條消息
ch.basic_ack(delivery_tag=method.delivery_tag)
# prefetch_count表示接收的消息數量,當我接收的消息沒有處理完(用basic_ack
# 標記消息已處理完畢)之前不會再接收新的消息了
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
我運行了兩個 worker.py
,並執行了5次 new_task.py
,分別發送了5條message:“Hello World! 111”、“Hello World! 222”、“Hello World! 333”、“Hello World! 444”和“Hello World! 555”,兩個worker的打印輸出如下:可以看出兩個worker是輪流獲取到消息的,並且同一條消息也不會發送給兩個worker,這也是RabbitMQ默認的消息發送機制。
[*] Waiting for messages.
[x] Received 'Hello World! 111'
[x] Done
[x] Received 'Hello World! 333'
[x] Done
[x] Received 'Hello World! 555'
[x] Done
[*] Waiting for messages.
[x] Received 'Hello World! 222'
[x] Done
[x] Received 'Hello World! 444'
[x] Done
注:示例代碼來自 https://www.rabbitmq.com/tutorials/tutorial-two-python.html
3. Publish/Subscribe模式
相對於工作/任務模式中的一個message只能發送給一個consumer使用,發布訂閱模式會將一個message同時發送給多個consumer使用,其實就是producer將message廣播給所有的consumer。
交換機
這個模式中會引入交換機(exchange)的概念,其實在RabbitMQ中,所有的producer都不會直接把message發送到queue中,甚至producer都不知道message在發出后有沒有發送到queue中,事實上,producer只能將message發送給exchange,由exchange來決定發送到哪個queue中。
exchange的一端用來從producer中接收message,另一端用來發送message到queue,exchange的類型規定了怎么處理接收到的message,發布訂閱模式使用到的exchange類型為 fanout
,這種exchange類型非常簡單,就是將接收到的message廣播給已知的(即綁定到此exchange的)所有consumer。
當然,如果不想使用特定的exchange,可以使用 exchange=''
表示使用默認的exchange,默認的exchange會將消息發送到 routing_key
指定的queue,可以參考工作(任務)隊列模式和Hello world模式。
fanout類型
在使用fanout類型的exchange時,並不是只有一個queue,然后將queue中的message每個consumer都發一份,而是會為每個已知(綁定)的consumer創建一個queue,然后廣播message到對應queue中,fanout類型的exchange會將從生產者接收到的message廣播到所有的綁定到自己的queue中,這個queue通常是由consumer端指定的專屬於consumer自己的、由RabbitMQ隨機命名的queue,由此,consumer廣播message后,每個consumer都能收到同樣的一條message了。
consumer端需要為自己生成一個專屬於自己的由RabbitMQ隨機命名的queue,然后綁定到fanout類型的exchange上,由此,exchange才知道將message廣播給哪些已經綁定到自己的queue。
示例 emit_log.py
:用於生成一條日志信息,然后廣播給所有consumer。
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 創建一個指定名稱的交換機,並指定類型為fanout,用於將接收到的消息廣播到所有queue中
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = "info: Hello World!"
# 將消息發送給指定的交換機,在fanout類型中,routing_key=''表示不用發送到指定queue中,
# 而是將發送到綁定到此交換機的所有queue
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
示例 receive_logs.py
:這個程序可以多運行幾個,表示有多個consumer需要使用producer發送的消息。
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 指定交換機
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 使用RabbitMQ給自己生成一個專有的queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 將queue綁定到指定交換機
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs.')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
注:示例代碼來自 https://www.rabbitmq.com/tutorials/tutorial-three-python.html
4. Routing模式
路由模式中,exchange類型為direct,與發布訂閱模式相似,但是不同之處在於,發布訂閱模式將message不加區分廣播給所有的綁定queue,但是路由模式中,允許queue在綁定exchange時,同時指定 routing_key
,exchange就只會發送message到與 routing_key
匹配的queue中,其他的所有message都將被丟棄。當然,也允許多個queue指定相同的 routing_key
,此時效果就相當於fanout類型的發布訂閱模式了。
producer端:從代碼上看,路由模式和訂閱模式非常相似,唯一不同的是,exchange類型為direct,且發送message時多了一個routing_key參數,exchange會根據routing_key將message發送到對應的queue中。
示例 emit_log_direct.py
:發送不同級別的日志消息到queue中,不同的consumer根據自己指定的routing_key接收message。
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 指定交換機名稱和類型
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# severity = 'info'
# severity = 'warning'
severity = 'error'
message = 'Hello World!'
# 與fanout類型的發布訂閱模式相比,只是多了一個routing_key參數
# 交換機會根據routing_key將消息發送到對應的queue中
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
consumer端:在路由模式中,不同的queue可以指定相同的routing_key,同一個queue也可以指定多個routing_key,從exchange角度看,它知道所有綁定到自己的queue,也知道每個queue指定的routing_key,發送消息時,只需要根據queue的routing_key進行發送即可。
示例 receive_logs_direct.py
:這個程序可以多運行幾個,每個程序指定接收不同日志級別的消息。
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 指定交換機名稱和類型
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 使用RabbitMQ給自己生成一個專屬於自己的queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 綁定queue到交換機,並指定自己只接受哪些routing_key
# 可以都接收,也可以只接收一種
# for severity in ['error', 'warning', 'info']:
for severity in ['error']:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
注:示例代碼來自 https://www.rabbitmq.com/tutorials/tutorial-four-python.html
5. Topics模式
主題模式的exchange類型為topic,相較於路由模式,主題模式更加靈活,區別就在於它的routing_key可以帶通配符 *
(匹配一個單詞)和 #
(匹配0個或多個單詞),每個單詞以點號分隔,但注意,routing_key的總大小不能超過255個字節。
如果一個message同時匹配了多個queue中的routing_key,那這幾個queue都會收到這個message,如果一個message同時匹配了一個queue中的多個routing_key,那這個queue也只會接收一次這條message,如果一個message沒有匹配上任何routing_key,那么這個message將被丟棄。
如果routing_key定義為 #
(就只有這一個通配符),那么這個queue將接收所有message,就像exchange類型為fanout的發布訂閱模式一樣,如果routing_key兩個通配符都沒有使用,那么這個queue將會接收固定routing_key的message,就像exchange類型為direct的路由模式一樣。
producer端:從代碼上講,producer的代碼與路由模式沒什么區別,只不過在routing_key的傳值上需要注意與想要發送到的queue進行匹配。
示例 emit_log_topic.py
:還是發送日志消息的示例,不過消息類型不再只有級別這一種類型,還添加了發送者的信息,級別與發送者之間以點號分隔。
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 指定交換機名稱和類型
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 以點號分隔每個單詞
routing_key = 'anonymous.error'
message = 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
consumer端:consumer根據需要,使用星號 *
和井號 #
兩個通配符對routing_key進行特定主題的匹配,其余部分與路由模式則是一致的。
示例 receive_logs_topic.py
: 這個程序可以多運行幾個,每個程序使用通配符指定不同的主題。
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 指定交換機名稱和類型
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 使用RabbitMQ給自己生成一個專屬於自己的queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 可以綁定多個routing_key,routing_key以點號分隔每個單詞
# *可匹配一個單詞,#可以匹配0個或多個單詞
for binding_key in ['anonymous.*']:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
注:示例代碼來自 https://www.rabbitmq.com/tutorials/tutorial-five-python.html
6. RPC模式
RPC遠程調用(Remote Procedure Call)模式其實就是使用消息隊列處理請求的一種方式,通常請求接收到后會立即執行且多個請求是並行執行的,如果一次性來了太多請求,達到了服務端處理請求的瓶頸就會影響性能,但是如果使用消息隊列的方式,最大的一點好處是可以不用立即處理請求,而是將請求放入消息隊列,服務端只需要根據自己的狀態從消息隊列中獲取並處理請求即可。
producer端:RPC模式的客戶端(producer)需要使用到兩個queue,一個用於發送request消息(此queue通常在服務端聲明和創建),一個用於接收response消息。另外需要特別注意的一點是,需要為每個request消息指定一個uuid(correlation_id屬性,類似請求id),用於識別返回的response消息是否屬於對應的request。
示例 rpc_client.py
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
# 創建一個此客戶端專用的queue,用於接收服務端發過來的消息
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
# 判斷接收到的response是否屬於對應request
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4()) # 為該消息指定uuid,類似於請求id
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue', # 將消息發送到該queue
properties=pika.BasicProperties(
reply_to=self.callback_queue, # 從該queue中取消息
correlation_id=self.corr_id, # 為此次消息指定uuid
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
consumer端:服務端也需要使用到兩個queue,一個接收request消息(通常由服務端創建),一個發送response消息(通常由客戶端創建),需要特別注意,發送response消息時需要將對應request的uuid(correlation_id屬性)賦上。
示例 rpc_server.py
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 指定接收消息的queue
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='', # 使用默認交換機
routing_key=props.reply_to, # response發送到該queue
properties=pika.BasicProperties(
correlation_id=props.correlation_id), # 使用correlation_id讓此response與請求消息對應起來
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
# 從rpc_queue中取消息,然后使用on_request進行處理
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
注:示例代碼來自 https://www.rabbitmq.com/tutorials/tutorial-six-python.html