在文章RabbitMQ入門(一)之Hello World,我們編寫程序通過指定的隊列來發送和接受消息。在本文中,我們將會創建工作隊列
(Work Queue),通過多個workers來分配耗時任務。
工作隊列(Work Queue,也被成為Task Queue,任務隊列)的中心思想是,避免立即執行一個資源消耗巨大且必須等待其完成的任務。相反地,我們調度好隊列可以安排該任務稍后執行。我們將一個任務(task)封裝成一個消息,將它發送至隊列。一個在后台運行的work進程將會拋出該任務,並最終執行該任務。當你運行多個workers的時候,任務將會在它們之中共享。
這個概念在web開發中很有用,因為通過一個短的HTTP請求不可能處理復雜的任務。
在之前的文章中,我們發送了一個包含“Hello World!”的消息。現在我們將會發送代表復雜任務的字串符。我們並沒有實際上的任務,比如重新調整圖片的尺寸或者渲染PDF,我們假裝有這樣的復雜任務,通過使用time.sleep()
函數。我們將會用字符串中的點(.)來代表復雜度;每一個點代表一秒中的任務。舉例來說,字符串Hello...
需要花費三秒。
我們需要稍微修改下sent.py
中的代碼,允許在命令中輸入任意字符串。該程序會調度任務至工作隊列,因此命名為new_task.py
:
import sys
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(" [x] Sent %r" % message)
我們原先的receive.py
也需要改動:它需要在消息體中將字符串的每一個點代表1秒鍾的任務。它會從隊列中拋出消息並執行該消息,因此命名為task.py
:
import time
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
Round-Robin分發(輪詢分發)
使用工作隊列的一個好處就是它能夠輕松實現並行工作。如果我們創建了一項積壓的工作,那么我們可以增加更多的worker來使它的擴展性更好。
首先,我們同時運行兩個worker.py
腳本。他們都能夠從隊列中獲取消息,但是具體是怎么實現的呢?讓我們接着閱讀。
你需要打開三個終端查看。兩個終端用於運行worker.py
腳本。這兩個終端將會成為兩個消費者——C1和C2。
# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
在第三個終端中,我們將會產生新的任務。一旦你啟動了這些消費者,你就可以發送一些消息了:
# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....
讓我們看看這兩個workers傳遞了什么:
# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
RabbitMQ默認會將每個消息依次地發送給下一個消費者。因此總的來說,每個消費者將會同樣數量的消息。這種消息分配的方法叫Round-Robin
。你可以嘗試三個或者更多的worker。
消息確認(Message Acknowledgement)
執行一項任務需要花費幾秒鍾。你也許會好奇,如果其中一個消費者執行一項耗時很長的任務,並且在執行了一部分的時候掛掉了,將會發生什么?根據我們現在的代碼,一旦RabbitMQ將消息傳送至消費者,那么RabbitMQ就會標志它為刪除狀態。在這種情況下,如果我們殺死某個worker,我們將會失去他正在處理的消息。我們也會失去所有分配至這個worker的消息,當然,這些消息還未被處理。
但是,我們不希望失去任何一項任務。如果有一個worker掛掉了,我們希望這些任務能夠被傳送至另一個worker。
為了確保消息不丟失,RabbitMQ支持消息確認
。一個ack(nowledgement)是由消費者發送回來的,用於告訴RabbitMQ,這個特定的消息已經被接受,被處理,可以被刪除了。
如果一個消費者掛了(它的channel關閉了,連接關閉了,或者TCP連接丟失)但是沒有發送一個ack,RabbitMQ就會知道這個消息並未被完全處理,會將它重新塞進隊列。如果同時還存在着其他在線消費者,RabbbitMQ將會將這個消息重新傳送給另一個消費者。用這種方式可以確保沒有消息丟失,即使workers偶爾會刮掉。
並不存在消息超時;如果消費者掛了,RabbitMQ將會重新傳送消息。這樣即使處理一個消息需要消耗很長很長的時間,也是可以的。
默認的消息確認
方式為人工消息確認
。在我們之前的例子中,我們清晰地將它關閉了,使用了auto_ack=True
這個命令。當我們完成一項任務的時候,根據需要,移除這個標志,從worker中發送一個合適的確認。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep( body.count('.') )
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
使用上述代碼,我們可以確保,即使我們使用CTRL+C
命令殺死了一個正在處理消息的woker,也不會丟失什么。這個worker掛掉后不久,所有未確認的消息將會被重新傳送。
消息確認
必須在同一個傳輸消息的channel中發送。嘗試着在不同的channel中進行消息確認將會引發channel-level protocol exception。
消息持久化(Message Durability)
我們已經學習了如何在消費者掛掉的情況下,任務不會丟失。但是,當RabbitMQ server停止時,我們的任務仍然會丟失。
當RabbitMQ停止或崩潰時,它將會忘記所有的隊列和消息,除非你告訴它不這么做。在這種情況下,需要做兩個事情確保消息不會丟失:我們需要將隊列和消息都設置為持久化。
首先,我們需要確保RabbitMQ不會丟失隊列。為了實現這個,我們需要將隊列聲明為持久化:
channel.queue_declare(queue='hello', durable=True)
盡管這個命令是正確的,但他仍會不會起作用。這是因為,我們已經創建了一個叫為hello
的非持久化隊列。RabbitMQ不允許你重新定義一個已經存在的隊列而參數不一樣,所有這樣做的程序只會引發錯誤。但是有一個快速的應變辦法——我們可以創建一個不同名稱的隊列,比如task_queue
:
channel.queue_declare(queue='task_queue', durable=True)
queue_declare
需要同時應用於生產者和消費者。
在這點上我們可以確保task_queue
隊列不會丟失消息即使RabbitMQ重啟。現在,我們需要聲明消息為持久化——將delivery_mode
這個參數設置為2。
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
公平分發(Fair Dispatch)
你也許注意到了,剛才的消息分發機制並不會嚴格地按照我們所希望的方式進行。舉這樣一個例子,設想有兩個worker,而所有的奇數消息都很重而偶數消息都是輕量級的,這樣其中一個worker就會一直很忙而另一個worker幾乎不做什么工作。然而,RabbitMQ對此一無所知,它仍然會平均分配消息。
這種情況的發生是因為RabbitMQ僅僅是當消息進入隊列的時候就會分發這個消息。它並不會注意消費者所接收的未確認的消息數量。它盲目地將第n個消息發送至第n個消費者。
為了克服這種情況,我們可以在basic.qos
方法中設置prefetch_count=1
。這就告訴RabbitMQ一次不要將多於一個的消息發送給一個worker。換句話說,不要分發一個新的消息給worker除非這個worker已經處理好之前的消息並且進行了消息確認。也就說,RabbitMQ將會將這個消息分發給下一個不是很忙的worker。
channel.basic_qos(prefetch_count=1)
實戰1
為了對上面的例子有一個好的理解,我們需要寫代碼進行實際操練一下。
生產者new_task.py
的代碼如下:
# -*- coding: utf-8 -*-
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
消費者worker.py
的完整代碼如下:
# -*- coding: utf-8 -*-
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
開啟三個終端,消息的發送和接收情況如下:
如果我們停掉其中一個worker,那么消息的接收情況如下:
可以看到,現在所有發送的消息都會被這個仍在工作的worker接收到。
實戰2
接下來,我們將會使用RabbitMQ的這種工作隊列的方式往MySQL數據庫中的表插入數據。
數據庫為orm_test,表格為exam_user,表結構如下:
接下來,我們需要往這張表中插入隨機創建的數據。如果我們利用Python的第三方模塊pymysql,每一次插入一條記錄,那么一分鍾插入53237條記錄。
利用RabbitMQ,我們的生產者代碼如下:
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:23
import pika
from random import choice
names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark',
'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David',
'Ella', 'Lake', 'Moon', 'Nake', 'Zoo']
places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao']
types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008',
'DG009', 'DG010', 'DG020']
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
for id in range(1, 20000001):
name = choice(names)
place = choice(places)
type2 = choice(types)
message = "insert into exam_users values(%s, '%s', '%s', '%s');" % (id, name, place, type2)
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
消費者代碼如下:
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:28
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Sanya Hainan
# time: 2020-01-12 13:45
import pika
import time
import pymysql
# 打開數據庫連接
db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test")
# 使用 cursor() 方法創建一個游標對象 cursor
cursor = db.cursor()
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
cursor.execute(body)
db.commit()
print(" [x] Insert successfully!")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
我們開啟9個終端,其中8個消費者1個生產者,先啟動消費者,然后生產者,按照上面的數據導入方式,一分鍾插入了133084條記錄,是普通方式的2.50倍,效率有大幅度提升!
讓我們稍微修改下生產者和消費者的代碼,一次提交插入多條記錄,減少每提交一次就插入一條記錄的消耗時間。新的生產者代碼如下:
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:23
import pika
from random import choice
import json
names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark',
'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David',
'Ella', 'Lake', 'Moon', 'Nake', 'Zoo']
places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao']
types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008',
'DG009', 'DG010', 'DG020']
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
for _ in range(1, 200001):
values = []
for i in range(100):
name = choice(names)
place = choice(places)
type2 = choice(types)
values.append([100*_+i+1, name, place, type2])
message = json.dumps(values)
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
新的消費者的代碼如下:
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:28
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Sanya Hainan
# time: 2020-01-12 13:45
import pika
import json
import time
import pymysql
# 打開數據庫連接
db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test")
# 使用 cursor() 方法創建一個游標對象 cursor
cursor = db.cursor()
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
sql = 'insert into exam_users values(%s, %s, %s, %s)'
cursor.executemany(sql, json.loads(body))
db.commit()
print(" [x] Insert successfully!")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
跟剛才一樣,我們開啟9個終端,其中8個消費者1個生產者,先啟動消費者,然后生產者,按照上面的數據導入方式,一分鍾插入了3170600條記錄,是普通方式的59.56倍,是先前一次只提交一條記錄的插入方式的23.82倍。這樣的提速無疑是非常驚人的!
當然還有更高效的數據插入方法,本文的方法僅僅是為了演示RabbitMQ的工作隊列以及在插入數據方面的提速。
本次分享到此結束,感謝大家閱讀~