RabbitMQ入門(二)工作隊列


  在文章RabbitMQ入門(一)之Hello World,我們編寫程序通過指定的隊列來發送和接受消息。在本文中,我們將會創建工作隊列(Work Queue),通過多個workers來分配耗時任務。
  工作隊列(Work Queue,也被成為Task Queue,任務隊列)的中心思想是,避免立即執行一個資源消耗巨大且必須等待其完成的任務。相反地,我們調度好隊列可以安排該任務稍后執行。我們將一個任務(task)封裝成一個消息,將它發送至隊列。一個在后台運行的work進程將會拋出該任務,並最終執行該任務。當你運行多個workers的時候,任務將會在它們之中共享。
  這個概念在web開發中很有用,因為通過一個短的HTTP請求不可能處理復雜的任務。
  在之前的文章中,我們發送了一個包含“Hello World!”的消息。現在我們將會發送代表復雜任務的字串符。我們並沒有實際上的任務,比如重新調整圖片的尺寸或者渲染PDF,我們假裝有這樣的復雜任務,通過使用time.sleep()函數。我們將會用字符串中的點(.)來代表復雜度;每一個點代表一秒中的任務。舉例來說,字符串Hello...需要花費三秒。
  我們需要稍微修改下sent.py中的代碼,允許在命令中輸入任意字符串。該程序會調度任務至工作隊列,因此命名為new_task.py

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print(" [x] Sent %r" % message)

  我們原先的receive.py也需要改動:它需要在消息體中將字符串的每一個點代表1秒鍾的任務。它會從隊列中拋出消息並執行該消息,因此命名為task.py

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

Round-Robin分發(輪詢分發)

  使用工作隊列的一個好處就是它能夠輕松實現並行工作。如果我們創建了一項積壓的工作,那么我們可以增加更多的worker來使它的擴展性更好。
  首先,我們同時運行兩個worker.py腳本。他們都能夠從隊列中獲取消息,但是具體是怎么實現的呢?讓我們接着閱讀。
  你需要打開三個終端查看。兩個終端用於運行worker.py腳本。這兩個終端將會成為兩個消費者——C1和C2。

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

在第三個終端中,我們將會產生新的任務。一旦你啟動了這些消費者,你就可以發送一些消息了:

# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....

讓我們看看這兩個workers傳遞了什么:

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

RabbitMQ默認會將每個消息依次地發送給下一個消費者。因此總的來說,每個消費者將會同樣數量的消息。這種消息分配的方法叫Round-Robin。你可以嘗試三個或者更多的worker。

消息確認(Message Acknowledgement)

  執行一項任務需要花費幾秒鍾。你也許會好奇,如果其中一個消費者執行一項耗時很長的任務,並且在執行了一部分的時候掛掉了,將會發生什么?根據我們現在的代碼,一旦RabbitMQ將消息傳送至消費者,那么RabbitMQ就會標志它為刪除狀態。在這種情況下,如果我們殺死某個worker,我們將會失去他正在處理的消息。我們也會失去所有分配至這個worker的消息,當然,這些消息還未被處理。
  但是,我們不希望失去任何一項任務。如果有一個worker掛掉了,我們希望這些任務能夠被傳送至另一個worker。
  為了確保消息不丟失,RabbitMQ支持消息確認。一個ack(nowledgement)是由消費者發送回來的,用於告訴RabbitMQ,這個特定的消息已經被接受,被處理,可以被刪除了。
  如果一個消費者掛了(它的channel關閉了,連接關閉了,或者TCP連接丟失)但是沒有發送一個ack,RabbitMQ就會知道這個消息並未被完全處理,會將它重新塞進隊列。如果同時還存在着其他在線消費者,RabbbitMQ將會將這個消息重新傳送給另一個消費者。用這種方式可以確保沒有消息丟失,即使workers偶爾會刮掉。
  並不存在消息超時;如果消費者掛了,RabbitMQ將會重新傳送消息。這樣即使處理一個消息需要消耗很長很長的時間,也是可以的。
  默認的消息確認方式為人工消息確認。在我們之前的例子中,我們清晰地將它關閉了,使用了auto_ack=True這個命令。當我們完成一項任務的時候,根據需要,移除這個標志,從worker中發送一個合適的確認。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep( body.count('.') )
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(queue='hello', on_message_callback=callback)

使用上述代碼,我們可以確保,即使我們使用CTRL+C命令殺死了一個正在處理消息的woker,也不會丟失什么。這個worker掛掉后不久,所有未確認的消息將會被重新傳送。
  消息確認必須在同一個傳輸消息的channel中發送。嘗試着在不同的channel中進行消息確認將會引發channel-level protocol exception。

消息持久化(Message Durability)

  我們已經學習了如何在消費者掛掉的情況下,任務不會丟失。但是,當RabbitMQ server停止時,我們的任務仍然會丟失。
  當RabbitMQ停止或崩潰時,它將會忘記所有的隊列和消息,除非你告訴它不這么做。在這種情況下,需要做兩個事情確保消息不會丟失:我們需要將隊列和消息都設置為持久化。
  首先,我們需要確保RabbitMQ不會丟失隊列。為了實現這個,我們需要將隊列聲明為持久化:

channel.queue_declare(queue='hello', durable=True)

盡管這個命令是正確的,但他仍會不會起作用。這是因為,我們已經創建了一個叫為hello的非持久化隊列。RabbitMQ不允許你重新定義一個已經存在的隊列而參數不一樣,所有這樣做的程序只會引發錯誤。但是有一個快速的應變辦法——我們可以創建一個不同名稱的隊列,比如task_queue

channel.queue_declare(queue='task_queue', durable=True)

queue_declare需要同時應用於生產者和消費者。
  在這點上我們可以確保task_queue隊列不會丟失消息即使RabbitMQ重啟。現在,我們需要聲明消息為持久化——將delivery_mode這個參數設置為2。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

公平分發(Fair Dispatch)

  你也許注意到了,剛才的消息分發機制並不會嚴格地按照我們所希望的方式進行。舉這樣一個例子,設想有兩個worker,而所有的奇數消息都很重而偶數消息都是輕量級的,這樣其中一個worker就會一直很忙而另一個worker幾乎不做什么工作。然而,RabbitMQ對此一無所知,它仍然會平均分配消息。
  這種情況的發生是因為RabbitMQ僅僅是當消息進入隊列的時候就會分發這個消息。它並不會注意消費者所接收的未確認的消息數量。它盲目地將第n個消息發送至第n個消費者。

  為了克服這種情況,我們可以在basic.qos方法中設置prefetch_count=1。這就告訴RabbitMQ一次不要將多於一個的消息發送給一個worker。換句話說,不要分發一個新的消息給worker除非這個worker已經處理好之前的消息並且進行了消息確認。也就說,RabbitMQ將會將這個消息分發給下一個不是很忙的worker。

channel.basic_qos(prefetch_count=1)

實戰1

  為了對上面的例子有一個好的理解,我們需要寫代碼進行實際操練一下。
  生產者new_task.py的代碼如下:

# -*- coding: utf-8 -*-

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

  消費者worker.py的完整代碼如下:

# -*- coding: utf-8 -*-

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

  開啟三個終端,消息的發送和接收情況如下:
message發送與接收情況
  如果我們停掉其中一個worker,那么消息的接收情況如下:
其中一個worker掛掉
可以看到,現在所有發送的消息都會被這個仍在工作的worker接收到。

實戰2

  接下來,我們將會使用RabbitMQ的這種工作隊列的方式往MySQL數據庫中的表插入數據。
  數據庫為orm_test,表格為exam_user,表結構如下:

exam_user數據庫表結構

  接下來,我們需要往這張表中插入隨機創建的數據。如果我們利用Python的第三方模塊pymysql,每一次插入一條記錄,那么一分鍾插入53237條記錄。
  利用RabbitMQ,我們的生產者代碼如下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:23
import pika
from random import choice

names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark',
         'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David',
         'Ella', 'Lake', 'Moon', 'Nake', 'Zoo']
places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao']
types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008',
         'DG009', 'DG010', 'DG020']


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for id in range(1, 20000001):
    name = choice(names)
    place = choice(places)
    type2 = choice(types)
    message = "insert into exam_users values(%s, '%s', '%s', '%s');" % (id, name, place, type2)

    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [x] Sent %r" % message)
connection.close()

  消費者代碼如下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:28
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Sanya Hainan
# time: 2020-01-12 13:45
import pika
import time
import pymysql

# 打開數據庫連接
db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test")

# 使用 cursor() 方法創建一個游標對象 cursor
cursor = db.cursor()

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    cursor.execute(body)
    db.commit()
    print(" [x] Insert successfully!")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

我們開啟9個終端,其中8個消費者1個生產者,先啟動消費者,然后生產者,按照上面的數據導入方式,一分鍾插入了133084條記錄,是普通方式的2.50倍,效率有大幅度提升!
  讓我們稍微修改下生產者和消費者的代碼,一次提交插入多條記錄,減少每提交一次就插入一條記錄的消耗時間。新的生產者代碼如下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:23
import pika
from random import choice
import json

names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark',
         'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David',
         'Ella', 'Lake', 'Moon', 'Nake', 'Zoo']
places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao']
types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008',
         'DG009', 'DG010', 'DG020']


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for _ in range(1, 200001):

    values = []
    for i in range(100):
        name = choice(names)
        place = choice(places)
        type2 = choice(types)
        values.append([100*_+i+1, name, place, type2])
    message = json.dumps(values)


    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [x] Sent %r" % message)

connection.close()

  新的消費者的代碼如下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:28
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Sanya Hainan
# time: 2020-01-12 13:45
import pika
import json
import time
import pymysql

# 打開數據庫連接
db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test")

# 使用 cursor() 方法創建一個游標對象 cursor
cursor = db.cursor()

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    sql = 'insert into exam_users values(%s, %s, %s, %s)'
    cursor.executemany(sql, json.loads(body))
    db.commit()
    print(" [x] Insert successfully!")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

跟剛才一樣,我們開啟9個終端,其中8個消費者1個生產者,先啟動消費者,然后生產者,按照上面的數據導入方式,一分鍾插入了3170600條記錄,是普通方式的59.56倍,是先前一次只提交一條記錄的插入方式的23.82倍。這樣的提速無疑是非常驚人的!
  當然還有更高效的數據插入方法,本文的方法僅僅是為了演示RabbitMQ的工作隊列以及在插入數據方面的提速。

  本次分享到此結束,感謝大家閱讀~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM