Python操作rabbitmq
1.基本用法
生產者
[](javascript:void(0)😉
1 import pika
2 import sys
3
4 username = 'wt' #指定遠程rabbitmq的用戶名密碼
5 pwd = '111111'
6 user_pwd = pika.PlainCredentials(username, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接
8 chan = s_conn.channel() #在連接上創建一個頻道
9
10 chan.queue_declare(queue='hello') #聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行
11 chan.basic_publish(exchange='', #交換機
12 routing_key='hello',#路由鍵,寫明將消息發往哪個隊列,本例是將消息發往隊列hello
13 body='hello world')#生產者要發送的消息
14 print("[生產者] send 'hello world")
15
16 s_conn.close()#當生產者發送完消息后,可選擇關閉連接
17
18
19 輸出:
20 [生產者] send 'hello world
[](javascript:void(0)😉
消費者
[](javascript:void(0)😉
import pika
username = 'wt'#指定遠程rabbitmq的用戶名密碼
pwd = '111111'
user_pwd = pika.PlainCredentials(username, pwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接
chan = s_conn.channel()#在連接上創建一個頻道
chan.queue_declare(queue='hello')#聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行
def callback(ch,method,properties,body): #定義一個回調函數,用來接收生產者發送的消息
print("[消費者] recv %s" % body)
chan.basic_consume(callback, #調用回調函數,從隊列里取消息
queue='hello',#指定取消息的隊列名
no_ack=True) #取完一條消息后,不給生產者發送確認消息,默認是False的,即 默認給rabbitmq發送一個收到消息的確認,一般默認即可
print('[消費者] waiting for msg .')
chan.start_consuming()#開始循環取消息
輸出:
[消費者] waiting for msg .
[消費者] recv b'hello world'
[](javascript:void(0)😉
2. 實現功能:
(1)rabbitmq循環調度,將消息循環發送給不同的消費者,如:消息1,3,5發送給消費者1;消息2,4,6發送給消費者2。
(2)消息確認機制,為了確保一個消息不會丟失,RabbitMQ支持消息的確認 , 一個 ack(acknowlegement) 是從消費者端發送一個確認去告訴RabbitMQ 消息已經接收了、處理了,RabbitMQ可以釋放並刪除掉了。如果一個消費者死掉了(channel關閉、connection關閉、或者TCP連接斷開了)而沒有發送ack,RabbitMQ 就會認為這個消息沒有被消費者處理,並會重新發送到生產者的隊列里,如果同時有另外一個消費者在線,rabbitmq將會將消息很快轉發到另外一個消費者中。 那樣的話你就能確保雖然一個消費者死掉,但消息不會丟失。
這個是沒有超時的,當消費方(consumer)死掉后RabbitMQ會重新轉發消息,即使處理這個消息需要很長很長時間也沒有問題。消息的 acknowlegments 默認是打開的,在前面的例子中關閉了: no_ack = True . 現在刪除這個標識 然后 發送一個 acknowledgment。
(3)消息持久化,將消息寫入硬盤中。 RabbitMQ不允許你重新定義一個已經存在、但屬性不同的queue。需要標記消息為持久化的 - 要通過設置 delivery_mode 屬性為 2來實現。
消息持久化的注意點:
標記消息為持久化並不能完全保證消息不會丟失,盡管已經告訴RabbitMQ將消息保存到磁盤,但RabbitMQ接收到的消息在還沒有保存的時候,仍然有一個短暫的時間窗口。RabbitMQ不會對每個消息都執行同步 --- 可能只是保存到緩存cache還沒有寫入到磁盤中。因此這個持久化保證並不是很強,但這比我們簡單的任務queue要好很多,如果想要很強的持久化保證,可以使用 publisher confirms。
(4)公平調度。在一個消費者未處理完一個消息之前不要分發新的消息給它,而是將這個新消息分發給另一個不是很忙的消費者進行處理。為了解決這個問題我們可以在消費者代碼中使用 channel.basic.qos ( prefetch_count = 1 ),將消費者設置為公平調度。
生產者
[](javascript:void(0)😉
1 import pika
2 import sys
3
4 username = 'wt' #指定遠程rabbitmq的用戶名密碼
5 pwd = '111111'
6 user_pwd = pika.PlainCredentials(username, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接
8 channel = s_conn.channel() #在連接上創建一個頻道
9
10 channel.queue_declare(queue='task_queue', durable=True) #創建一個新隊列task_queue,設置隊列持久化,注意不要跟已存在的隊列重名,否則有報錯
11
12 message = "Hello World"
13 channel.basic_publish(exchange='',
14 routing_key='worker',#寫明將消息發送給隊列worker
15 body=message, #要發送的消息
16 properties=pika.BasicProperties(delivery_mode=2,)#設置消息持久化,將要發送的消息的屬性標記為2,表示該消息要持久化
17 )
18 print(" [生產者] Send %r " % message)
[](javascript:void(0)😉
消費者
[](javascript:void(0)😉
1 import pika
2 import time
3
4 username = 'wt'#指定遠程rabbitmq的用戶名密碼
5 pwd = '111111'
6 user_pwd = pika.PlainCredentials(username, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接
8 channel = s_conn.channel()#在連接上創建一個頻道
9
10 channel.queue_declare(queue='task_queue', durable=True) #創建一個新隊列task_queue,設置隊列持久化,注意不要跟已存在的隊列重名,否則有報錯
11
12
13 def callback(ch, method, properties, body):
14 print(" [消費者] Received %r" % body)
15 time.sleep(1)
16 print(" [消費者] Done")
17 ch.basic_ack(delivery_tag=method.delivery_tag)# 接收到消息后會給rabbitmq發送一個確認
18
19 channel.basic_qos(prefetch_count=1) # 消費者給rabbitmq發送一個信息:在消費者處理完消息之前不要再給消費者發送消息
20
21 channel.basic_consume(callback,
22 queue='worker',
23 #這里就不用再寫no_ack=False了
24 )
25 channel.start_consuming()
[](javascript:void(0)😉
3.交換機
exchange:交換機。生產者不是將消息發送給隊列,而是將消息發送給交換機,由交換機決定將消息發送給哪個隊列。所以exchange必須准確知道消息是要送到哪個隊列,還是要被丟棄。因此要在exchange中給exchange定義規則,所有的規則都是在exchange的類型中定義的。
exchange有4個類型:direct, topic, headers ,fanout
之前,我們並沒有講過exchange,但是我們仍然可以將消息發送到隊列中。這是因為我們用的是默認exchange.也就是說之前寫的:exchange='',空字符串表示默認的exchange。
之前的代碼結構:
1 channel.basic_publish(exchange='',
2 routing_key='hello',
3 body=message)
exchange = '參數'
參數表示exchange 的名字,空字符串是默認或者沒有exchange。消息被路由到某隊列的根據是:routing_key.。如果routing_key的值存在的話。
現在,我們可以用我們自己命名的exchange來代替默認的exchange。
1 channel.basic_publish(exchange='logs',#自己命名exchange為logs
2 routing_key='',
3 body=message)
(1)fanout(廣播):
廣播類型,生產者將消息發送給所有消費者,如果某個消費者沒有收到當前消息,就再也收不到了(消費者就像收音機)
生產者:(可以用作日志收集系統)
[](javascript:void(0)😉
1 import pika
2 import sys
3 username = 'wt' #指定遠程rabbitmq的用戶名密碼
4 pwd = '111111'
5 user_pwd = pika.PlainCredentials(username, pwd)
6 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接
7 channel = s_conn.channel() #在連接上創建一個頻道
8 channel.exchange_declare(exchange='logs',
9 type='fanout')#創建一個fanout(廣播)類型的交換機exchange,名字為logs。
10
11 message = "info: Hello World!"
12 channel.basic_publish(exchange='logs',#指定交換機exchange為logs,這里只需要指定將消息發給交換機logs就可以了,不需要指定隊列,因為生產者消息是發送給交換機的。
13 routing_key='',#在fanout類型中,綁定關鍵字routing_key必須忽略,寫空即可
14 body=message)
15 print(" [x] Sent %r" % message)
16 connection.close()
[](javascript:void(0)😉
消費者:
[](javascript:void(0)😉
1 import pika
2 import sys
3
4 username = 'wt' #指定遠程rabbitmq的用戶名密碼
5 pwd = '111111'
6 user_pwd = pika.PlainCredentials(username, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接
8 channel = s_conn.channel() #在連接上創建一個頻道
9
10 channel.exchange_declare(exchange='logs',
11 type='fanout')#消費者需再次聲明一個exchange 以及類型。
12
13 result = channel.queue_declare(exclusive=True)#創建一個隊列,exclusive=True(唯一性)表示在消費者與rabbitmq斷開連接時,該隊列會自動刪除掉。
14 queue_name = result.method.queue#因為rabbitmq要求新隊列名必須是與現存隊列名不同,所以為保證隊列的名字是唯一的,method.queue方法會隨機創建一個隊列名字,如:‘amq.gen-JzTY20BRgKO-HjmUJj0wLg‘。
15
16 channel.queue_bind(exchange='logs',
17 queue=queue_name)#將交換機logs與接收消息的隊列綁定。表示生產者將消息發給交換機logs,logs將消息發給隨機隊列queue,消費者在隨機隊列queue中取消息
18
19 print(' [消費者] Waiting for logs. To exit press CTRL+C')
20
21 def callback(ch, method, properties, body):
22 print(" [消費者] %r" % body)
23
24 channel.basic_consume(callback,#調用回調函數從queue中取消息
25 queue=queue_name,
26 no_ack=True)#設置為消費者不給rabbitmq回復確認。
27
28 channel.start_consuming()#循環等待接收消息。
[](javascript:void(0)😉
這樣,開啟多個消費者后,會同時從生產者接收相同的消息。
(2)direct(關鍵字類型):
關鍵字類型。功能:交換機根據生產者消息中含有的不同的關鍵字將消息發送給不同的隊列,消費者根據不同的關鍵字從不同的隊列取消息
生產者:不用創建對列
[](javascript:void(0)😉
1 import pika
2 import sys
3
4 username = 'wt' #指定遠程rabbitmq的用戶名密碼
5 pwd = '111111'
6 user_pwd = pika.PlainCredentials(username, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接
8 channel = s_conn.channel() #在連接上創建一個頻道
9
10 channel.exchange_declare(exchange='direct_logs',
11 type='direct')#創建一個交換機並聲明exchange的類型為:關鍵字類型,表示該交換機會根據消息中不同的關鍵字將消息發送給不同的隊列
12
13 severity = 'info'#severity這里只能為一個字符串,這里為‘info’表明本生產者只將下面的message發送到info隊列中,消費者也只能從info隊列中接收info消息
14 message = 'Hello World!'
15 channel.basic_publish(exchange='direct_logs',#指明用於發布消息的交換機、關鍵字
16 routing_key=severity,#綁定關鍵字,即將message與關鍵字info綁定,明確將消息發送到哪個關鍵字的隊列中。
17 body=message)
18 print(" [生產者] Sent %r:%r" % (severity, message))
19 connection.close()
[](javascript:void(0)😉
消費者:
[](javascript:void(0)😉
1 import pika
2 import sys
3
4 username = 'wt' #指定遠程rabbitmq的用戶名密碼
5 pwd = '111111'
6 user_pwd = pika.PlainCredentials(username, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接
8 channel = s_conn.channel() #在連接上創建一個頻道
9
10 channel.exchange_declare(exchange='direct_logs',
11 type='direct')#創建交換機,命名為‘direct_logs’並聲明exchange類型為關鍵字類型。
12
13 result = channel.queue_declare(exclusive=True)#創建隨機隊列,當消費者與rabbitmq斷開連接時,這個隊列將自動刪除。
14 queue_name = result.method.queue#分配隨機隊列的名字。
15
16 severities = ['info','err']#可以接收綁定關鍵字info或err的消息,列表中也可以只有一個
17 if not severities:#判斷如果輸入有誤,輸出用法
18 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
19 sys.exit(1)
20
21 for severity in severities:
22 channel.queue_bind(exchange='direct_logs',#將交換機、隊列、關鍵字綁定在一起,使消費者只能根據關鍵字從不同隊列中取消息
23 queue=queue_name,
24 routing_key=severity)#該消費者綁定的關鍵字。
25
26 print(' [消費者] Waiting for logs. To exit press CTRL+C')
27
28 def callback(ch, method, properties, body):#定義回調函數,接收消息
29 print(" [消費者] %r:%r" % (method.routing_key, body))
30
31 channel.basic_consume(callback,
32 queue=queue_name,
33 no_ack=True)#消費者接收消息后,不給rabbimq回執確認。
34
35 channel.start_consuming()#循環等待消息接收。
[](javascript:void(0)😉
(3)topics:模糊匹配類型。比較常用
發送到一個 topics交換機的消息,它的 routing_key不能是任意的 -- 它的routing_key必須是一個用小數點分割的單詞列表。 這個字符可以是任何單詞,但是通常是一些指定意義的字符。比如:“stock.usd.nyse","nyse.vmw","quick.orange.rabbit". 這里可以是你想要路由鍵的任意字符。最高限制為255字節。
生產者與消費者的routing_key必須在同一個表單中。 Topic交換的背后的邏輯類似直接交換(direct) -- 包含特定關鍵字的消息將會分發到所有匹配的關鍵字隊列中。然后有兩個重要的特殊情況:
綁定鍵值:
> * (星) 可代替一個單詞
> # (井) 可代替0個或多個單詞
生產者:
[](javascript:void(0)😉
1 import pika
2 import sys
3
4 username = 'wt' #指定遠程rabbitmq的用戶名密碼
5 pwd = '111111'
6 user_pwd = pika.PlainCredentials(username, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接
8 channel = s_conn.channel() #在連接上創建一個頻道
9
10 channel.exchange_declare(exchange='topic_logs',
11 type='topic') # 創建模糊匹配類型的exchange。。
12
13 routing_key = '[warn].kern'##這里關鍵字必須為點號隔開的單詞,以便於消費者進行匹配。引申:這里可以做一個判斷,判斷產生的日志是什么級別,然后產生對應的routing_key,使程序可以發送多種級別的日志
14 message = 'Hello World!'
15 channel.basic_publish(exchange='topic_logs',#將交換機、關鍵字、消息進行綁定
16 routing_key=routing_key, # 綁定關鍵字,將隊列變成[warn]日志的專屬隊列
17 body=message)
18 print(" [x] Sent %r:%r" % (routing_key, message))
19 s_conn.close()
[](javascript:void(0)😉
消費者:
[](javascript:void(0)😉
1 import pika
2 import sys
3
4 username = 'wt'#指定遠程rabbitmq的用戶名密碼
5 pwd = '111111'
6 user_pwd = pika.PlainCredentials(username, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=user_pwd))#創建連接
8 channel = s_conn.channel()#在連接上創建一個頻道
9
10 channel.exchange_declare(exchange='topic_logs',
11 type='topic') # 聲明exchange的類型為模糊匹配。
12
13 result = channel.queue_declare(exclusive=True) # 創建隨機一個隊列當消費者退出的時候,該隊列被刪除。
14 queue_name = result.method.queue # 創建一個隨機隊列名字。
15
16 binding_keys = ['[warn]', 'info.*']#綁定鍵。‘#’匹配所有字符,‘*’匹配一個單詞。這里列表中可以為一個或多個條件,能通過列表中字符匹配到的消息,消費者都可以取到
17 if not binding_keys:
18 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
19 sys.exit(1)
20
21 for binding_key in binding_keys:#通過循環綁定多個“交換機-隊列-關鍵字”,只要消費者在rabbitmq中能匹配到與關鍵字相應的隊列,就從那個隊列里取消息
22 channel.queue_bind(exchange='topic_logs',
23 queue=queue_name,
24 routing_key=binding_key)
25
26 print(' [*] Waiting for logs. To exit press CTRL+C')
27
28
29 def callback(ch, method, properties, body):
30 print(" [x] %r:%r" % (method.routing_key, body))
31
32
33 channel.basic_consume(callback,
34 queue=queue_name,
35 no_ack=True)#不給rabbitmq發送確認
36
37 channel.start_consuming()#循環接收消息
[](javascript:void(0)😉
4.遠程過程調用(RPC)
Remote procedure call
消息屬性
AMQP協議在一個消息中預先定義了一個包含14個屬性的集合。大部分屬性很少用到,以下幾種除外:
> delivery_mode: 標記一個消息為持久的(值為2)或者 瞬時的(其它值), 你需要記住這個屬性(在第二課時用到過)
> content_type : 用來描述 MIME 類型的編碼 ,比如我們經常使用的 JSON 編碼,設置這個屬性就非常好實現: application/json
> reply_to:reply_to沒有特別的意義,只是一個普通的變量名,只是它通常用來命名一個 callback 隊列
> correlation_id : 用來關聯RPC的請求與應答。關聯id的作用:當在一個隊列中接收了一個返回,我們並不清楚這個結果時屬於哪個請求的,這樣當correlation_id屬性使用后,我們為每個請求設置一個唯一值,這個值就是關聯id。這樣,請求會有一個關聯id,該請求的返回結果也有一個相同的關聯id。然后當我們從callback隊列中接收到一個消息后,我們查看一下這個關聯,基於這個我們就能將請求和返回進行匹配。如果我們看到一個未知的correlation_id值,我們可以直接丟棄這個消息 -- 它是不屬於我們的請求。
1.RPC執行過程:
callback隊列
我們的RPC將會這樣執行:
> 當客戶端啟動后,它創建一個匿名的唯一的回調隊列
> 對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來為每個請求設置一個唯一標識)
> 請求發送到 rpc_queue隊列
> RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現后,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to字段中的隊列
> 客戶端在callback 隊列中等待數據, 當一個消息出現后,檢查這個correlation_id屬性,如果和請求中的值匹配將返回給應用
代碼:rpc_server.py代碼
[python]
- #!/usr/bin/env python
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='rpc_queue')
- def fib(n):
- if n == 0:
- return 0
- elif n == 1:
- return 1
- else:
- return fib(n-1) + fib(n-2)
- def on_request(ch, method, props, body):
- n = int(body)
- print(" [.] fib(%s)" % n)
- response = fib(n)
- ch.basic_publish(exchange='',
- routing_key=props.reply_to,
- properties=pika.BasicProperties(correlation_id = props.correlation_id),
- body=str(response))
- ch.basic_ack(delivery_tag = method.delivery_tag)
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(on_request, queue='rpc_queue')
- print(" [x] Awaiting RPC requests")
- channel.start_consuming()
服務端代碼詳單簡單:
> (4) 和往常一樣我們建立一個連接並定義一個隊列
> (11) 我們定義了 斐波納契 函數,假定輸入的都是合法正數
> (19) 我們定義了一個回調的 basic_consume, RPC服務的核心。 當收到請求后執行這個函數並返回結果
> (32) 我們可能會執行多個服務端,為了在多個服務端上均勻的分布負荷,我們需要這是 prefetch_count。
rpc_client.py 代碼:
[python]
- #!/usr/bin/env python
- import pika
- import uuid
- class FibonacciRpcClient(object):
- def init(self):
- self.connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- self.channel = self.connection.channel()
- result = self.channel.queue_declare(exclusive=True)
- self.callback_queue = result.method.queue
- self.channel.basic_consume(self.on_response, no_ack=True,
- queue=self.callback_queue)
- def on_response(self, ch, method, props, body):
- if self.corr_id == props.correlation_id:
- self.response = body
- def call(self, n):
- self.response = None
- self.corr_id = str(uuid.uuid4())
- self.channel.basic_publish(exchange='',
- routing_key='rpc_queue',
- properties=pika.BasicProperties(
- reply_to = self.callback_queue,
- correlation_id = self.corr_id,
- ),
- body=str(n))
- while self.response is None:
- self.connection.process_data_events()
- return int(self.response)
- fibonacci_rpc = FibonacciRpcClient()
- print(" [x] Requesting fib(30)")
- response = fibonacci_rpc.call(30)
- print(" [.] Got %r" % response)
客戶端代碼稍微復雜些:
> (7) 我們建立一個連接,通道並定義一個專門的’callback‘隊列用來接收回復
> (16) 我們訂閱了“callback”隊列,因此我們能夠接收 RPC 的返回結果
> (18) ’on_response' 在每個返回中執行的回調是一個簡單的job, 對每個返回消息將檢查是否correlation_id使我們需要查找的那個ID,如果是,將保存結果到 self.response 並終端consuming循環
> (23) 下一步,我們定義我們的main方法 - 執行實際的RPC請求
> (24) 在這方法中,首先我們生產一個唯一的 correlatin_id 號並保存 -- 'on_response"回調函數將用着號碼來匹配發送和接收的消息值
> (25) 下一步,發布請求信息,使用兩個屬性: reply_to 和 correlation_id
> (32) 這一步我們可以坐等結果的返回
>(33) 最后我們返回結果給用戶
或者 看下邊一篇好理解一點
前面的例子都有個共同點,就是發送端發送消息出去后沒有結果返回。如果只是單純發送消息,當然沒有問題了,但是在實際中,常常會需要接收端將收到的消息進行處理之后,返回給發送端。
處理方法描述:發送端在發送信息前,產生一個接收消息的臨時隊列,該隊列用來接收返回的結果。其實在這里接收端、發送端的概念已經比較模糊了,因為發送端也同樣要接收消息,接收端同樣也要發送消息,所以這里筆者使用另外的示例來演示這一過程。
示例內容:假設有一個控制中心和一個計算節點,控制中心會將一個自然數N發送給計算節點,計算節點將N值加1后,返回給控制中心。這里用center.py模擬控制中心,compute.py模擬計算節點。
compute.py代碼分析
計算節點的代碼比較簡單,值得一提的是,原來的接收方法都是直接將消息打印出來,這邊進行了加一的計算,並將結果發送回控制中心。
center.py代碼分析
上例代碼定義了接收返回數據的隊列和處理方法,並且在發送請求的時候將該隊列賦值給reply_to,在計算節點代碼中就是通過這個參數來獲取返回隊列的。
打開兩個終端,一個運行代碼python compute.py,另外一個終端運行center.py,如果執行成功,應該就能看到效果了。
筆者在測試的時候,出了些小問題,就是在center.py發送消息時沒有指明返回隊列,結果compute.py那邊在計算完結果要發回數據時報錯,提示routing_key不存在,再次運行也報錯。用rabbitmqctl list_queues查看隊列,發現compute_queue隊列有1條數據,每次重新運行compute.py的時候,都會重新處理這條數據。后來使用/etc/init.d/rabbitmq-server restart重新啟動下rabbitmq就ok了。
參考文章:http://www.rabbitmq.com/tutorials/tutorial-six-python.html