一.介紹
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
二.引入
基於Queue實現生產者消費者模型

#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
對於RabbitMQ來說,生產和消費不再針對內存里的一個Queue對象,而是在某台服務器上的RabbitMQ Server實現消息隊列
實現最簡單的隊列通信:
#!/usr/bin/env python import pika # ######################### 生產者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()
#聲明query channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
#!/usr/bin/env python import pika # ########################## 消費者 ########################## connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #接收端也需要聲明query,如果不聲明就必須要先運行發送端,參照之前的socket通信 channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
三.幾種模式
#四種交換器 fanout: 所有bind到此exchange的queue都可以接收消息 direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息 表達式符號說明:#代表一個或多個字符,*代表任何字符 例:#.a會匹配a.a,aa.a,aaa.a等 *.a會匹配a.a,b.a,c.a等 注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout headers: 通過headers 來決定把消息發給哪些queue
1.work模式
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差不多

import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 聲明queue channel.queue_declare(queue='task_queue') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. import sys message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent %r" % message) connection.close()

#_*_coding:utf-8_*_ import pika, time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(20) print(" [x] Done") print("method.delivery_tag",method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='task_queue', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
啟動一個發送端,啟動三個接收端,通過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上。
注意:
1)消息不丟失
設置消息應答模式no-ack = False,如果有一個消費者掛掉了,那么我們應該將分發給它的任務交付給另一個消費者去處理。
如果no-ack = True,那么如果殺死正在執行任務的消費者,會丟失正在處理的消息,也會丟失已經分發給這個消費者但尚未處理的消息。
#接受端 channel.basic_consume(callback, queue='hello', no_ack=True ) #實際上按照上面的參數順序會報錯,需要這么寫 channel.basic_consume('hello',callback,False)
我們已經了解了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ服務器停止,我們的任務仍將失去!
當RabbitMQ退出或者崩潰,將會丟失隊列和消息。除非你不要隊列和消息。
所以兩件事兒必須保證消息不被丟失:我們必須把“隊列”和“消息”設為持久化。
2)接受端和發送端都需要設置durable=true ,其實保證的是隊列的持久化
channel.queue_declare(queue='hello', durable=True)
3)要實現消息持久化還需要設置發送端delivery_mode = 2
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
channel.basic_qos(prefetch_count=1)
帶消息持久化+公平分發的完整代碼
發送端
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1')) channel = connection.channel() channel.queue_declare(queue='task_queue',durable=True) message = 'Hello World!' channel.basic_publish(exchange='',routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2,)) print('[x] sent %r' %message) connection.close()
接收端
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='127.0.0.1')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume('task_queue',callback,False) channel.start_consuming()
2.發布/訂閱模式
之前的例子基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,
Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息。
發送方:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
接收方
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue_name,callback,True) channel.start_consuming()
應用場景
比如一個商城系統需要在管理員上傳商品新的圖片時,前台系統必須更新圖片,日志系統必須記錄相應的日志,那么就可以將兩個隊列綁定到圖片上傳交換器上,一個用於前台系統更新圖片,另一個用於日志系統記錄日志。
3.路由模式
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。例如:我們只想把error級別的日志寫到磁盤文件中,而其它級別的日志消息則過濾掉。
在這里我們將要使用direct類型的exchange。Direct類型exchange的路由算法是很簡單的:要想一個消息能到達這個隊列,需要binding key和routing key正好能匹配得上。
發送端
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
接受端:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare('anne') queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C')
應用場景
利用消費者能夠有選擇性的接收消息的特性,比如我們商城系統的后台管理系統對於商品進行修改、刪除、新增操作都需要更新前台系統的界面展示,而查詢操作確不需要,那么這兩個隊列分開接收消息就比較好。
4.主題模式
上面的路由模式是根據路由key進行完整的匹配(完全相等才發送消息),這里的通配符模式通俗的來講就是模糊匹配。
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
- # 表示可以匹配 0 個 或 多個 單詞
- * 表示只能匹配 一個 單詞

在這個例子中,我們將會發送一些描述動物的消息。Routing key的第一個單詞是描述速度的,第二個單詞是描述顏色的,第三個是描述物種的:“<speed>.<colour>.<species>”。 這里我們創建三個Binding:Binding key為”.orange.”的Q1,和binding key為”..rabbit”和”lazy.#”的Q2。 這些binding可以總結為: Q1對所有橘色的(orange)的動物感興趣; Q2希望能拿到所有兔子的(rabbit)信息,還有比較懶惰的(lazy.#)動物信息。 一條以” quick.orange.rabbit”為routing key的消息將會推送到Q1和Q2兩個queue上,routing key為“lazy.orange.elephant”的消息同樣會被推送到Q1和Q2上。但如果routing key為”quick.orange.fox”的話,消息只會被推送到Q1上;routing key為”lazy.brown.fox”的消息會被推送到Q2上,routing key為"lazy.pink.rabbit”的消息也會被推送到Q2上,但同一條消息只會被推送到Q2上一次。 如果在發送消息時所指定的exchange和routing key在消費者端沒有對應的exchange和binding key與之綁定的話,那么這條消息將會被丟棄掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing為”lazy.orange.male.rabbit”的消息,將會被推到Q2上。
發送方
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
接收方
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(queue_name, callback, True) channel.start_consuming()
總結:
關於 RabbitMQ 的五種隊列,其實實際使用最多的是最后一種主題模式,通過模糊匹配,使得操作更加自如。那么我們總結一下有交換器參與的隊列(最后三種隊列)工作方式如下
生產者:
1.鏈接Rabbit MQ--->2.獲取信道--->3.聲明交換器--->4.創建消息--->5.發布消息--->6.關閉信道--->7.關閉鏈接
消費者:
1.鏈接Rabbit MQ--->2.獲取信道--->3.聲明交換器--->4.聲明隊列--->5.把隊列和交換器綁定-->6.消費消息--->7.關閉信道--->8.關閉鏈接
四.遠程調用(RPC)
前面介紹的都是一個發送方,一個接收方,如果我們發送方發送了之后想從接收方獲取結果呢?
首先,發送方發送請求時,聲明結果返回的隊列(即結果返回時到哪個隊列取結果),並在發送消息時將需要返回結果的隊列以及生成的corr_id一並發送,
接收方收到請求后,將結果發送到指定的隊列,並附帶上corr_id。
最后原先的發送方再到指定的隊列取結果。
實現一個遠程調用斐波那契數列
RPC server
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') #綁定取消息的隊列 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) #處理請求的方法 def on_request(ch, method, props, body): n = int(body) #消息主題為str格式需要轉成int print(" [.] fib(%s)" % n) response = fib(n) #調用fib(),生成返回結果 ch.basic_publish(exchange='', routing_key=props.reply_to,#返回指定隊列 properties=pika.BasicProperties(correlation_id= props.correlation_id),#將請求附帶的correlation_id返回 body=str(response)) #將結果轉成str返回 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume('rpc_queue',on_request) #從rpc_queue取消息,並調用on_request方法處理 print(" [x] Awaiting RPC requests") channel.start_consuming()
RPC client
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare('anne') #綁定返回隊列,隨機生成 self.callback_queue = result.method.queue self.channel.basic_consume(self.callback_queue,self.on_response,True, ) #從綁定的返回隊列取結果 def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: #檢測是否與發送的corr_id一致 self.response = body #取出返回結果傳給response def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) #生成唯一的corr_id self.channel.basic_publish(exchange='', routing_key='rpc_queue', #發送的隊列 properties=pika.BasicProperties( reply_to=self.callback_queue, #返回的隊列 correlation_id=self.corr_id, #發送corr_id ), body=str(n))#消息主體 while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) #調用Call()方法 print(" [.] Got %r" % response) #打印返回結果