發布/訂閱 系統
1.基本用法
生產者
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接 8 chan = s_conn.channel() #在連接上創建一個頻道 9 10 chan.queue_declare(queue='hello') #聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行 11 chan.basic_publish(exchange='', #交換機 12 routing_key='hello',#路由鍵,寫明將消息發往哪個隊列,本例是將消息發往隊列hello 13 body='hello world')#生產者要發送的消息 14 print("[生產者] send 'hello world") 15 16 s_conn.close()#當生產者發送完消息后,可選擇關閉連接 17 18 19 輸出: 20 [生產者] send 'hello world
消費者
import pika username = 'wt'#指定遠程rabbitmq的用戶名密碼 pwd = '111111' user_pwd = pika.PlainCredentials(username, pwd) s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接 chan = s_conn.channel()#在連接上創建一個頻道 chan.queue_declare(queue='hello')#聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行 def callback(ch,method,properties,body): #定義一個回調函數,用來接收生產者發送的消息 print("[消費者] recv %s" % body) chan.basic_consume(callback, #調用回調函數,從隊列里取消息 queue='hello',#指定取消息的隊列名 no_ack=True) #取完一條消息后,不給生產者發送確認消息,默認是False的,即 默認給rabbitmq發送一個收到消息的確認,一般默認即可 print('[消費者] waiting for msg .') chan.start_consuming()#開始循環取消息 輸出: [消費者] waiting for msg . [消費者] recv b'hello world'
2. 實現功能:(1)rabbitmq循環調度,將消息循環發送給不同的消費者,如:消息1,3,5發送給消費者1;消息2,4,6發送給消費者2。
(2)消息確認機制,為了確保一個消息不會丟失,RabbitMQ支持消息的確認 , 一個 ack(acknowlegement) 是從消費者端發送一個確認去告訴RabbitMQ 消息已經接收了、處理了,RabbitMQ可以釋放並刪除掉了。如果一個消費者死掉了(channel關閉、connection關閉、或者TCP連接斷開了)而沒有發送ack,RabbitMQ 就會認為這個消息沒有被消費者處理,並會重新發送到生產者的隊列里,如果同時有另外一個消費者在線,rabbitmq將會將消息很快轉發到另外一個消費者中。 那樣的話你就能確保雖然一個消費者死掉,但消息不會丟失。
這個是沒有超時的,當消費方(consumer)死掉后RabbitMQ會重新轉發消息,即使處理這個消息需要很長很長時間也沒有問題。消息的 acknowlegments 默認是打開的,在前面的例子中關閉了: no_ack = True . 現在刪除這個標識 然后 發送一個 acknowledgment。
(3)消息持久化,將消息寫入硬盤中。 RabbitMQ不允許你重新定義一個已經存在、但屬性不同的queue。需要標記消息為持久化的 - 要通過設置 delivery_mode 屬性為 2來實現。
消息持久化的注意點:
標記消息為持久化並不能完全保證消息不會丟失,盡管已經告訴RabbitMQ將消息保存到磁盤,但RabbitMQ接收到的消息在還沒有保存的時候,仍然有一個短暫的時間窗口。RabbitMQ不會對每個消息都執行同步 --- 可能只是保存到緩存cache還沒有寫入到磁盤中。因此這個持久化保證並不是很強,但這比我們簡單的任務queue要好很多,如果想要很強的持久化保證,可以使用 publisher confirms。
(4)公平調度。在一個消費者未處理完一個消息之前不要分發新的消息給它,而是將這個新消息分發給另一個不是很忙的消費者進行處理。為了解決這個問題我們可以在消費者代碼中使用 channel.basic.qos ( prefetch_count = 1 ),將消費者設置為公平調度。
生產者
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接 8 channel = s_conn.channel() #在連接上創建一個頻道 9 10 channel.queue_declare(queue='task_queue', durable=True) #創建一個新隊列task_queue,設置隊列持久化,注意不要跟已存在的隊列重名,否則有報錯 11 12 message = "Hello World" 13 channel.basic_publish(exchange='', 14 routing_key='worker',#寫明將消息發送給隊列worker 15 body=message, #要發送的消息 16 properties=pika.BasicProperties(delivery_mode=2,)#設置消息持久化,將要發送的消息的屬性標記為2,表示該消息要持久化 17 ) 18 print(" [生產者] Send %r " % message)
消費者
1 import pika 2 import time 3 4 username = 'wt'#指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接 8 channel = s_conn.channel()#在連接上創建一個頻道 9 10 channel.queue_declare(queue='task_queue', durable=True) #創建一個新隊列task_queue,設置隊列持久化,注意不要跟已存在的隊列重名,否則有報錯 11 12 13 def callback(ch, method, properties, body): 14 print(" [消費者] Received %r" % body) 15 time.sleep(1) 16 print(" [消費者] Done") 17 ch.basic_ack(delivery_tag=method.delivery_tag)# 接收到消息后會給rabbitmq發送一個確認 18 19 channel.basic_qos(prefetch_count=1) # 消費者給rabbitmq發送一個信息:在消費者處理完消息之前不要再給消費者發送消息 20 21 channel.basic_consume(callback, 22 queue='worker', 23 #這里就不用再寫no_ack=False了 24 ) 25 channel.start_consuming()
3.交換機
exchange:交換機。生產者不是將消息發送給隊列,而是將消息發送給交換機,由交換機決定將消息發送給哪個隊列。所以exchange必須准確知道消息是要送到哪個隊列,還是要被丟棄。因此要在exchange中給exchange定義規則,所有的規則都是在exchange的類型中定義的。
exchange有4個類型:direct, topic, headers ,fanout
之前,我們並沒有講過exchange,但是我們仍然可以將消息發送到隊列中。這是因為我們用的是默認exchange.也就是說之前寫的:exchange='',空字符串表示默認的exchange。
之前的代碼結構:
1 channel.basic_publish(exchange='', 2 routing_key='hello', 3 body=message)
exchange = '參數'
參數表示exchange 的名字,空字符串是默認或者沒有exchange。消息被路由到某隊列的根據是:routing_key.。如果routing_key的值存在的話。
現在,我們可以用我們自己命名的exchange來代替默認的exchange。
1 channel.basic_publish(exchange='logs',#自己命名exchange為logs 2 routing_key='', 3 body=message)
(1)fanout:廣播類型,生產者將消息發送給所有消費者,如果某個消費者沒有收到當前消息,就再也收不到了(消費者就像收音機)
生產者:(可以用作日志收集系統)
1 import pika 2 import sys 3 username = 'wt' #指定遠程rabbitmq的用戶名密碼 4 pwd = '111111' 5 user_pwd = pika.PlainCredentials(username, pwd) 6 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接 7 channel = s_conn.channel() #在連接上創建一個頻道 8 channel.exchange_declare(exchange='logs', 9 type='fanout')#創建一個fanout(廣播)類型的交換機exchange,名字為logs。 10 11 message = "info: Hello World!" 12 channel.basic_publish(exchange='logs',#指定交換機exchange為logs,這里只需要指定將消息發給交換機logs就可以了,不需要指定隊列,因為生產者消息是發送給交換機的。 13 routing_key='',#在fanout類型中,綁定關鍵字routing_key必須忽略,寫空即可 14 body=message) 15 print(" [x] Sent %r" % message) 16 connection.close()
消費者:
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接 8 channel = s_conn.channel() #在連接上創建一個頻道 9 10 channel.exchange_declare(exchange='logs', 11 type='fanout')#消費者需再次聲明一個exchange 以及類型。 12 13 result = channel.queue_declare(exclusive=True)#創建一個隊列,exclusive=True(唯一性)表示在消費者與rabbitmq斷開連接時,該隊列會自動刪除掉。 14 queue_name = result.method.queue#因為rabbitmq要求新隊列名必須是與現存隊列名不同,所以為保證隊列的名字是唯一的,method.queue方法會隨機創建一個隊列名字,如:‘amq.gen-JzTY20BRgKO-HjmUJj0wLg‘。 15 16 channel.queue_bind(exchange='logs', 17 queue=queue_name)#將交換機logs與接收消息的隊列綁定。表示生產者將消息發給交換機logs,logs將消息發給隨機隊列queue,消費者在隨機隊列queue中取消息 18 19 print(' [消費者] Waiting for logs. To exit press CTRL+C') 20 21 def callback(ch, method, properties, body): 22 print(" [消費者] %r" % body) 23 24 channel.basic_consume(callback,#調用回調函數從queue中取消息 25 queue=queue_name, 26 no_ack=True)#設置為消費者不給rabbitmq回復確認。 27 28 channel.start_consuming()#循環等待接收消息。
這樣,開啟多個消費者后,會同時從生產者接收相同的消息。
(2)direct:關鍵字類型。功能:交換機根據生產者消息中含有的不同的關鍵字將消息發送給不同的隊列,消費者根據不同的關鍵字從不同的隊列取消息
生產者:不用創建對列
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接 8 channel = s_conn.channel() #在連接上創建一個頻道 9 10 channel.exchange_declare(exchange='direct_logs', 11 type='direct')#創建一個交換機並聲明exchange的類型為:關鍵字類型,表示該交換機會根據消息中不同的關鍵字將消息發送給不同的隊列 12 13 severity = 'info'#severity這里只能為一個字符串,這里為‘info’表明本生產者只將下面的message發送到info隊列中,消費者也只能從info隊列中接收info消息 14 message = 'Hello World!' 15 channel.basic_publish(exchange='direct_logs',#指明用於發布消息的交換機、關鍵字 16 routing_key=severity,#綁定關鍵字,即將message與關鍵字info綁定,明確將消息發送到哪個關鍵字的隊列中。 17 body=message) 18 print(" [生產者] Sent %r:%r" % (severity, message)) 19 connection.close()
消費者:
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接 8 channel = s_conn.channel() #在連接上創建一個頻道 9 10 channel.exchange_declare(exchange='direct_logs', 11 type='direct')#創建交換機,命名為‘direct_logs’並聲明exchange類型為關鍵字類型。 12 13 result = channel.queue_declare(exclusive=True)#創建隨機隊列,當消費者與rabbitmq斷開連接時,這個隊列將自動刪除。 14 queue_name = result.method.queue#分配隨機隊列的名字。 15 16 severities = ['info','err']#可以接收綁定關鍵字info或err的消息,列表中也可以只有一個 17 if not severities:#判斷如果輸入有誤,輸出用法 18 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 19 sys.exit(1) 20 21 for severity in severities: 22 channel.queue_bind(exchange='direct_logs',#將交換機、隊列、關鍵字綁定在一起,使消費者只能根據關鍵字從不同隊列中取消息 23 queue=queue_name, 24 routing_key=severity)#該消費者綁定的關鍵字。 25 26 print(' [消費者] Waiting for logs. To exit press CTRL+C') 27 28 def callback(ch, method, properties, body):#定義回調函數,接收消息 29 print(" [消費者] %r:%r" % (method.routing_key, body)) 30 31 channel.basic_consume(callback, 32 queue=queue_name, 33 no_ack=True)#消費者接收消息后,不給rabbimq回執確認。 34 35 channel.start_consuming()#循環等待消息接收。
(3)topics:模糊匹配類型。比較常用
生產者:
1 import pika 2 import sys 3 4 username = 'wt' #指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接 8 channel = s_conn.channel() #在連接上創建一個頻道 9 10 channel.exchange_declare(exchange='topic_logs', 11 type='topic') # 創建模糊匹配類型的exchange。。 12 13 routing_key = '[warn].kern'##這里關鍵字必須為點號隔開的單詞,以便於消費者進行匹配。引申:這里可以做一個判斷,判斷產生的日志是什么級別,然后產生對應的routing_key,使程序可以發送多種級別的日志 14 message = 'Hello World!' 15 channel.basic_publish(exchange='topic_logs',#將交換機、關鍵字、消息進行綁定 16 routing_key=routing_key, # 綁定關鍵字,將隊列變成[warn]日志的專屬隊列 17 body=message) 18 print(" [x] Sent %r:%r" % (routing_key, message)) 19 s_conn.close()
消費者:
1 import pika 2 import sys 3 4 username = 'wt'#指定遠程rabbitmq的用戶名密碼 5 pwd = '111111' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接 8 channel = s_conn.channel()#在連接上創建一個頻道 9 10 channel.exchange_declare(exchange='topic_logs', 11 type='topic') # 聲明exchange的類型為模糊匹配。 12 13 result = channel.queue_declare(exclusive=True) # 創建隨機一個隊列當消費者退出的時候,該隊列被刪除。 14 queue_name = result.method.queue # 創建一個隨機隊列名字。 15 16 binding_keys = ['[warn]', 'info.*']#綁定鍵。‘#’匹配所有字符,‘*’匹配一個單詞。這里列表中可以為一個或多個條件,能通過列表中字符匹配到的消息,消費者都可以取到 17 if not binding_keys: 18 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 19 sys.exit(1) 20 21 for binding_key in binding_keys:#通過循環綁定多個“交換機-隊列-關鍵字”,只要消費者在rabbitmq中能匹配到與關鍵字相應的隊列,就從那個隊列里取消息 22 channel.queue_bind(exchange='topic_logs', 23 queue=queue_name, 24 routing_key=binding_key) 25 26 print(' [*] Waiting for logs. To exit press CTRL+C') 27 28 29 def callback(ch, method, properties, body): 30 print(" [x] %r:%r" % (method.routing_key, body)) 31 32 33 channel.basic_consume(callback, 34 queue=queue_name, 35 no_ack=True)#不給rabbitmq發送確認 36 37 channel.start_consuming()#循環接收消息
4.遠程過程調用(RPC)Remote procedure call
RPC執行過程:

代碼:
或者 看下邊一篇好理解一點
前面的例子都有個共同點,就是發送端發送消息出去后沒有結果返回。如果只是單純發送消息,當然沒有問題了,但是在實際中,常常會需要接收端將收到的消息進行處理之后,返回給發送端。
處理方法描述:發送端在發送信息前,產生一個接收消息的臨時隊列,該隊列用來接收返回的結果。其實在這里接收端、發送端的概念已經比較模糊了,因為發送端也同樣要接收消息,接收端同樣也要發送消息,所以這里筆者使用另外的示例來演示這一過程。
示例內容:假設有一個控制中心和一個計算節點,控制中心會將一個自然數N發送給計算節點,計算節點將N值加1后,返回給控制中心。這里用center.py模擬控制中心,compute.py模擬計算節點。
compute.py代碼分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
#!/usr/bin/env python
#coding=utf8
import
pika
#連接rabbitmq服務器
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
'localhost'
))
channel
=
connection.channel()
#定義隊列
channel.queue_declare(queue
=
'compute_queue'
)
print
' [*] Waiting for n'
#將n值加1
def
increase(n):
return
n
+
1
#定義接收到消息的處理方法
def
request(ch, method, properties, body):
print
" [.] increase(%s)"
%
(body,)
response
=
increase(
int
(body))
#將計算結果發送回控制中心
ch.basic_publish(exchange
=
'',
routing_key
=
properties.reply_to,
body
=
str
(response))
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_qos(prefetch_count
=
1
)
channel.basic_consume(request, queue
=
'compute_queue'
)
channel.start_consuming()
|
計算節點的代碼比較簡單,值得一提的是,原來的接收方法都是直接將消息打印出來,這邊進行了加一的計算,並將結果發送回控制中心。
center.py代碼分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
#!/usr/bin/env python
#coding=utf8
import
pika
class
Center(
object
):
def
__init__(
self
):
self
.connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
'localhost'
))
self
.channel
=
self
.connection.channel()
#定義接收返回消息的隊列
result
=
self
.channel.queue_declare(exclusive
=
True
)
self
.callback_queue
=
result.method.queue
self
.channel.basic_consume(
self
.on_response,
no_ack
=
True
,
queue
=
self
.callback_queue)
#定義接收到返回消息的處理方法
def
on_response(
self
, ch, method, props, body):
self
.response
=
body
def
request(
self
, n):
self
.response
=
None
#發送計算請求,並聲明返回隊列
self
.channel.basic_publish(exchange
=
'',
routing_key
=
'compute_queue'
,
properties
=
pika.BasicProperties(
reply_to
=
self
.callback_queue,
),
body
=
str
(n))
#接收返回的數據
while
self
.response
is
None
:
self
.connection.process_data_events()
return
int
(
self
.response)
center
=
Center()
print
" [x] Requesting increase(30)"
response
=
center.request(
30
)
print
" [.] Got %r"
%
(response,)
|
上例代碼定義了接收返回數據的隊列和處理方法,並且在發送請求的時候將該隊列賦值給reply_to,在計算節點代碼中就是通過這個參數來獲取返回隊列的。
打開兩個終端,一個運行代碼python compute.py,另外一個終端運行center.py,如果執行成功,應該就能看到效果了。
筆者在測試的時候,出了些小問題,就是在center.py發送消息時沒有指明返回隊列,結果compute.py那邊在計算完結果要發回數據時報錯,提示routing_key不存在,再次運行也報錯。用rabbitmqctl list_queues查看隊列,發現compute_queue隊列有1條數據,每次重新運行compute.py的時候,都會重新處理這條數據。后來使用/etc/init.d/rabbitmq-server restart重新啟動下rabbitmq就ok了。
參考文章:http://www.rabbitmq.com/tutorials/tutorial-six-python.html