什么是rabbitMQ
rabbitMQ是一款基於AMQP協議的消息中間件,它能夠在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。而且使用消息中間件利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。而且兩端可以使用不同的語言編寫,大大提供了靈活性。
rabbitMQ基本概念
- exchange: producer只能將消息發送給exchange。而exchange負責將消息發送到queues。Exchange必須准確的知道怎么處理它接受到的消息,是被發送到一個特定的queue還是許多quenes,還是被拋棄,這些規則則是通過exchange type來定義。主要的type有direct,topic,headers,fanout。具體針對不同的場景使用不同的type。
- queue: 消息隊列,消息的載體。接收來自exchange的消息,然后再由consumer取出。exchange和queue是可以一對多的,它們通過routingKey來綁定。
- Producer:生產者,消息的來源,消息必須發送給exchange。而不是直接給queue
- Consumer:消費者,直接從queue中獲取消息進行消費,而不是從exchange。
從以上可以看出Rabbitmq工作原理大致就是producer把一條消息發送給exchange。rabbitMQ根據routingKey負責將消息從exchange發送到對應綁定的queue中去,這是由rabbitMQ負責做的。而consumer只需從queue獲取消息即可。基本效果圖如下:

持久化問題
消息確認機制
這里就會有一個問題,如果consumer在執行任務時需要花費一些時間,這個時候如果突然掛了,消息還沒有被完成,消息豈不是丟失了,為了不讓消息丟失,rabbitmq提供了消息確認機制,consumer在接收到,執行完消息后會發送一個ack給rabbitmq告訴它可以從queue中移除消息了。如果沒收到ack。Rabbitmq會重新發送此條消息,如果有其他的consumer在線,將會接收並消費這條消息。消息確認機制是默認打開的。如果想關閉它只需要設置no_ack=true。在此處我們不需要設置。默認如下就行。
隊列持久化
- 除了consumer之外我們還得確保rabbitMQ掛了之后消息不被丟失。這里我們就需要確保隊列queue和消息messages都得是持久化的。
- 隊列的持久話需要設置durable屬性。
channel.queue_declare(queue= task_queue, durable=True)
消息持久化
消息的持久話則是通過delivery_mode屬性,設置值為2即可。
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
簡單發送模型
在rabbit MQ里消息永遠不能被直接發送到queue。這里我們通過提供一個空字符串來使用默認的exchange。這個exchange是特殊的,它可以根據routingKey把消息發送給指定的queue。所以我們的設計看起來如下所示:

發送端:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
#聲明queue
channel.queue_declare(queue='hello')
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
接收端:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
工作隊列模式
一個生產者發送消息到隊列中,有多個消費者共享一個隊列,每個消費者獲取的消息是唯一的。
消息公平分發原則(類似負載均衡)
如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

channel.basic_qos(prefetch_count=1)
生產者端:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
消費者端:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
Publish\Subscribe(消息發布\訂閱)之廣播模式
在前面2個示例我們都適用默認的exchange。這里我們將自己定義一個exchange。並設置type為fanout。它可以將消息廣播給綁定的每一個queue。而不再是某一個queue。

生產者端:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
消費者端:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
#不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
Publish\Subscribe(消息發布\訂閱)之direct模式
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

生產者端:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
消費者端:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
運行結果:
發送消息

只收到warning的消息

只收到error的消息

error和waring的都能收到

Publish\Subscribe(消息發布\訂閱)之Topic模式
這種模型是最靈活的,相比較於direct的完全匹配和fanout的廣播。Topic可以用類似正則的手法更好的匹配來滿足我們的應用。下面我們首先了解一下topic類型的exchange。
topic類型的routing_key不可以是隨意的單詞,它必須是一系列的單詞組合,中間以點號隔開,譬如“quick.orange.rabbit”這個樣子。發送消息的routing_key必須匹配上綁定到隊列的routing_key。消息才會被發送。
此外還有個重要的地方要說明,在如下代碼處綁定的routing_key種可以有*和#2種字符。它們代表的意義如下:
- *(星號) :可以匹配任意一個單詞
- #(井號) :可以匹配0到多個單詞

由圖可知,Q1匹配3個單詞中間為orange的routing_key ,而Q2可以匹配3個單詞最后一個單詞為rabbit和第一個單詞為lazy后面可以有多個單詞的routing_key。
生產者端:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
消費者端:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
RPC模式
當我們需要在遠程服務器上執行一個方法並等待它的結果的時候,我們將這種模式稱為RPC。
在rabbit MQ中為了能讓client收到server端的response message。需要定義一個callback queue,不過現在有一個問題,就是每次請求都會創建一個callback queue .這樣的效率是極其低下的。幸運的是我們可以通過correlation_id為每一個client創建一個單獨的callback queue。通過指定correlation_id我們可以知道callback queue中的消息屬於哪個client。要做到這樣只需client每次發送請求時帶上這唯一的correlation_id。然后當我們從callback queue中收到消息時,我們能基於 correlation_id 匹配上我們的消息。匹配不上的消息將被丟棄,看上去就像下圖這樣:

總結一下流程如下:
-
- client發起請求,請求中帶有2個參數reply_to和correlation_id
- 請求發往rpc_queue
- server獲取到rpc_queue中的消息,處理完畢后,將結果發往reply_to指定的callback queue
- client 獲取到callback queue中的消息,匹配correlation_id,如果匹配就獲取,不匹配就丟棄.
生產者端:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
消費者端:
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
