RabbitMQ
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
安裝
http://erlang.org/download/otp_win64_18.3.exe #依賴包erlang
http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1.exe
pip install pika
簡單的通信
import pika #producer端 #建立連接 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #創建channel channel = connection.channel() #聲明queue channel.queue_declare(queue='testMQ') channel.basic_publish(exchange='',#Producer只能發送到exchange,它是不能直接發送到queue的,發送到默認exchange routing_key="testMQ",#路由key發送指定隊列 body="hello this is test!") #消息 print(" [x] Sent 'hello this is test!'") #關閉連接 connection.close()
import pika #建立連接 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #創建channel channel = connection.channel() #如果生產者先運行並創建了隊列這里就可以不用聲明,但是有可能消費者先運行 下面的basic_consume就會因為沒有隊列報錯。 channel.queue_declare(queue="testMQ") #定義回調函數用於取出隊列中的數據 def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue="testMQ", no_ack=True) #不用確認消息 print(" [*] Waiting for messages. To exit press Ctrl + C ") channel.start_consuming()#監聽數據
默認情況下,RabbitMQ 會順序的分發每個Message。當每個收到ack后,會將該Message刪除,然后將下一個Message分發到下一個Consumer。這種分發方 式叫做round-robin。
acknowledgment 消息確認
每個Consumer可能需要一段時間才能處理完收到的數據。如果在這個過程中,Consumer出錯了,異常退出了,而數據還沒有處理完成,那么 非常不幸,這段數據就丟失了。因為我們采用no-ack的方式進行確認,也就是說,每次Consumer接到數據后,而不管是否處理完 成,RabbitMQ Server會立即把這個Message標記為完成,然后從queue中刪除了。
如果一個Consumer異常退出了,它處理的數據能夠被另外的Consumer處理,這樣數據在這種情況下就不會丟失了(注意是這種情況下)。
為了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。為了保證數據能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應該是在處理完數據后發送ack。
在處理數據后發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ可以去安全的刪除它了。
如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數據也不會丟失。
這里並沒有用到超時機制。RabbitMQ僅僅通過Consumer的連接中斷來確認該Message並沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做數據處理。
import pika #producer端 #建立連接 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #生成通道 channel = connection.channel() #聲明queue 生產者必須聲明 channel.queue_declare(queue='testMQ') channel.basic_publish(exchange='', routing_key="testMQ", body="hello this is test!") print(" [x] Sent 'hello this is test!'") #關閉連接 connection.close()
#client
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() #如果生產者先運行並創建了隊列這里就可以不用聲明,但是有可能消費者先運行 下面的basic_consume就會因為沒有隊列報錯。 #channel.queue_declare(queue="testMQ") #已經創建的隊列不是durable再賦值durable也無法改變 channel.queue_declare(queue="task_mq",durable=True) #定義回調函數用於取出隊列中的數據 def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(5) print("ok") ch.basic_ack(delivery_tag = method.delivery_tag) #消息持久化 channel.basic_consume(callback, queue = "task_mq", no_ack = False) #no-ack = False,如果生產者遇到情況(its channel is closed, connection is closed, #or TCP connection is lost)掛掉了,那么,RabbitMQ會重新將該任務添加到隊列中。但是生產者掛了,消息就沒有了!!! print(" [*] Waiting for messages. To exit press Ctrl + C ") channel.start_consuming()
Message durability消息持久化
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() #確保隊列不丟失 channel.queue_declare(queue='task_mq', durable=True) channel.basic_publish(exchange='', routing_key='task_mq', body="will i come back!", properties=pika.BasicProperties( delivery_mode = 2,#make messages persistent )) print(" [x] Sent 'will i come back!'") connection.close()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() #如果生產者先運行並創建了隊列這里就可以不用聲明,但是有可能消費者先運行 下面的basic_consume就會因為沒有隊列報錯。 #channel.queue_declare(queue="testMQ") #已經創建的隊列,再賦值durable是無法改變的,rabbitmq已經再維護它了。 channel.queue_declare(queue="task_mq",durable=True) #定義回調函數用於取出隊列中的數據 def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(5) print("ok") ch.basic_ack(delivery_tag = method.delivery_tag) #消息持久化 channel.basic_consume(callback, queue = "task_mq", no_ack = False) #no-ack = False,如果生產者遇到情況(its channel is closed, connection is closed, #or TCP connection is lost)掛掉了,那么,RabbitMQ會重新將該任務添加到隊列中。 #也就是說如果生產者掛了,消息就沒有了!!! print(" [*] Waiting for messages. To exit press Ctrl + C ") channel.start_consuming()
為了數據不丟失,我們采用了: 在數據處理結束后發送ack,這樣RabbitMQ Server會認為Message Deliver 成功。 持久化queue,可以防止RabbitMQ Server 重啟或者crash引起的數據丟失。 持久化Message,理由同上。 但是這樣能保證數據100%不丟失嗎? 答案是否定的。問題就在與RabbitMQ需要時間去把這些信息存到磁盤上,這個time window雖然短,但是它的確還是有。
在這個時間窗口內如果數據沒有保存,數據還會丟失。還有另一個原因就是RabbitMQ並不是為每個Message都做fsync:
它可能僅僅是把它保存到Cache里,還沒來得及保存到物理磁盤上。
消息公平分發
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() #確保隊列不丟失 channel.queue_declare(queue='task_mq', durable=True) channel.basic_publish(exchange='', routing_key='task_mq', body="will i come back!", properties=pika.BasicProperties( delivery_mode = 2,#make messages persistent )) print(" [x] Sent 'will i come back!'") connection.close()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # make message persistent channel.queue_declare(queue='testMQ') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(5) print('ok') ch.basic_ack(delivery_tag = method.delivery_tag) #告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='testMQ', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
Publish\Subscribe(消息發布\訂閱)
之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,
Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息。
- fanout: 所有bind到此exchange的queue都可以接收消息
- direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
- topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

import pika import sys #發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。 # 所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定義exchange 類型為fanout channel.exchange_declare(exchange='logs', type='fanout') #發送消息 message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()

import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定義exchange channel.exchange_declare(exchange='logs', type='fanout') #定義隊列每一個訂閱者都有自己的隊列,隊列名是隨機的 #Consumer關閉連接時,這個queue要被deleted。可以加個exclusive的參數。 result = channel.queue_declare(exclusive=True) #獲取隊列名 queue_name = result.method.queue #將隊列和exchangebanding 默認發布不是直接發送到隊列而是先到exchange #由exchange分發給訂閱者 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
有選擇的接收消息(exchange type=direct)
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。
exchange X和兩個queue綁定在一起。C1的binding key是error。C2的binding key是info,error和warning。當P publish key是info時,exchange會把它放到C2。如果是error那么就會到C1,C2。

import pika import sys #連接rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters( host="localhost" )) #生成channel channel = connection.channel() #設置exchange為direct模式 channel.exchange_declare(exchange="direct_logs", type = "direct") #隊列關鍵字 自己輸入或默認info serverity = sys.argv[1] if len(sys.argv) > 1 else "info" #消息 自己輸入或默認 message = ' '.join(sys.argv[2:]) or "hello world" #發布 channel.basic_publish(exchange="direct_logs", routing_key=serverity, #根據關鍵字發送到指定隊列 body=message) print(" [x] Sent %r:%r" % (serverity, message)) connection.close()

import pika import sys #建立連接 connection = pika.BlockingConnection(pika.ConnectionParameters( host="localhost" )) #生成channel channel = connection.channel() #聲明exchang channel.exchange_declare(exchange="direct_logs", type="direct") #聲明專用隊列隊列名隨機 result = channel.queue_declare(exclusive=True) #獲取隊列名 queue_name = result.method.queue #用戶定義消息關鍵字 serverities = sys.argv[1:] if not serverities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for serverity in serverities: channel.queue_bind(exchange="direct_logs", queue=queue_name, routing_key=serverity)#關鍵字 print(' [*] Waiting for logs. To exit prees CTRL+C') #獲取數據 def callback(ch, method, propertes, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
Topic exchange
對於Message的routing_key是有限制的,不能使任意的。格式是以點號“."分割的字符表。比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,當然最長不能超過255 bytes。
- # 表示可以匹配 0 個 或 多個 單詞
- * 表示只能匹配 一個 單詞

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') #用戶輸入關鍵字定義 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' #自定義發送消息 message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close() #執行 > python rabbitmq_publisher_topic.py nginx.error nginx is down #c2接收 > python rabbitmq_publisher_topic.py login.info koka loggin #c1接收 > python rabbitmq_publisher_topic.py kernel.critical kernel fault #都收不到

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #用戶定義接收關鍵字 binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: #隊列綁定關鍵字 channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') #回調處理接收信息 def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() #開始監聽 #執行 c1 python rabbitmq_subcriber_topic.py *.info c2 python rabbitmq_subvriber_topic.py #.erro
遠程過程調用 RPC
1. 客戶端接口 Client interface
為了展示一個RPC服務是如何使用的,我們將創建一段很簡單的客戶端class。 它將會向外提供名字為call的函數,這個call會發送RPC請求並且阻塞知道收到RPC運算的結果。代碼如下:
fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
2.回調函數隊列 Callback queue
總體來說,在RabbitMQ進行RPC遠程調用是比較容易的。client發送請求的Message然后server返回響應結果。為了收到響應 client在publish message時需要提供一個”callback“(回調)的queue地址。code如下:
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)
AMQP 預定義了14個屬性。它們中的絕大多很少會用到。以下幾個是平時用的比較多的:
- delivery_mode: 持久化一個Message(通過設定值為2)。其他任意值都是非持久化。
- content_type: 描述mime-type 的encoding。比如設置為JSON編碼:設置該property為application/json。
- reply_to: 一般用來指明用於回調的queue(Commonly used to name a callback queue)。
- correlation_id: 在請求中關聯處理RPC響應(correlate RPC responses with requests)。
3.關聯ID (Correlation ID)
在上個小節里,實現方法是對每個RPC請求都會創建一個callback queue。這是不高效的。幸運的是,在這里有一個解決方法:為每個client創建唯一的callback queue。
這又有其他問題了:收到響應后它無法確定是否是它的,因為所有的響應都寫到同一個queue了。上一小節的correlation_id在這種情況下就派上用場了:對於每個request,
都設置唯一的一個值,在收到響應后,通過這個值就可以判斷是否是自己的響應。如果不是自己的響應,就不去處理。
4.概要(Summary)

#server.py
import pika
#建立連接 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
#生成通道 channel = connection.channel() #聲明客戶端請求隊列 channel.queue_declare(queue='rpc_queue') #定義操作函數 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) #處理請求 def on_request(ch, method, props, body): #消息體 n = int(body) #打印請求 print(" [.] fib(%s)" % (n,)) #回應結果 response = fib(n) #回應請求 ch.basic_publish(exchange='', routing_key=props.reply_to,#發送至客戶端的回調隊列 callback queue properties=pika.BasicProperties(correlation_id = \ props.correlation_id), #回應消息關聯corr_id #消息體 body=str(response)) #確保客戶端發送請求被收到 ch.basic_ack(delivery_tag = method.delivery_tag) #make message persistent #告訴RabbitMQ服務端當前消息還沒處理完的時候就不要再給我發新消息了。 channel.basic_qos(prefetch_count=1) #接收客戶端請求,調用on_request函數處理 channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming() #監聽客戶端請求
#client.py
import pika import uuid class FibonacciRpcClient(object): def __init__(self): #連接rabbitmq self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) #定義通道 self.channel = self.connection.channel() #定義專用隊列,隊列名隨機,斷開連接時刪除隊列 result = self.channel.queue_declare(exclusive=True) #獲取隊列名 self.callback_queue = result.method.queue #接收服務端回應的callback_queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #接收消息 def on_response(self, ch, method, props, body): #服務端回應的correlation_id等於請求的id 接收數據 if self.corr_id == props.correlation_id: self.response = body #發起請求 def call(self, n): self.response = None #生成corr_id self.corr_id = str(uuid.uuid4()) #發起請求 self.channel.basic_publish(exchange='', routing_key='rpc_queue',#發送至rpc_queue隊列 properties=pika.BasicProperties( reply_to = self.callback_queue,#回調隊列 #告訴服務端從這個隊列回應請求 correlation_id = self.corr_id, #請求關聯corr_id ), body=str(n))#消息 #監聽回應消息 while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") #發送請求 response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
server端: [x] Awaiting RPC requests
[.] fib(30) client端: [x] Requesting fib(30) [.] Got 832040