python中RabbitMQ的使用(工作隊列)


消息可以理解為任務,消息發送者可以看成任務派送者(sender),消息接收者可以看成工作者(worker)。

當工作者接收到一個任務,還沒完任務時分配者又發一個任務,此時需要多個工作者來共同處理這些任務。

任務分派結構圖如下:

注:此時有一個任務派送人P,兩個工作接收者C1和C2。

現在我們來模擬該情況:

1.首先打開三個終端:

2.分別在前兩個終端運行receive1.py

 3.在第三個終端多次運行send1.py

 此時將會輪流向worker1和worker2分派任務。

問題:

在以上任務分配和完成情況中,有幾個問題將會產生:

1.工作者任務是否完成?

2.工作者掛掉后,如何防止未完成的任務丟失,並且如何處理這些任務?

3.RabbitMQ自身出現問題,此時如何防止任務丟失?

4.任務有輕重之分,如何實現公平調度?

方案:

1.消息確認(Message acknowledgment)

當任務完成后,工作者(receiver)將消息反饋給RabbitMQ:

1 def callback(ch, method, properties, body):
2     print " [x] Received %r" % (body,)
3     #停頓5秒,方便ctrl+c退出
4     time.sleep(5)
5     print " [x] Done"
6     #當工作者完成任務后,會反饋給rabbitmq
7     ch.basic_ack(delivery_tag=method.delivery_tag)

2.保留任務(no_ack=False)

當工作者掛掉后,防止任務丟失:

# 去除no_ack=True參數或者設置為False后可以實現
# 一個工作者ctrl+c退出后,正在執行的任務也不會丟失,rabbitmq會將任務重新分配給其他工作者。
channel.basic_consume(callback, queue='task_queue', no_ack=False)

3.消息持久化存儲(Message durability)

聲明持久化存儲:

# durable=True即聲明持久化存儲
channel.queue_declare(queue='task_queue', durable=True)

在發送任務時,用delivery_mode=2來標記任務為持久化存儲:

1 # 用delivery_mode=2來標記任務為持久化存儲:
2 channel.basic_publish(exchange='',
3                       routing_key='task_queue',
4                       body=message,
5                       properties=pika.BasicProperties(
6                           delivery_mode=2,
7                       ))

4.公平調度(Fair dispatch)

使用basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工作者分配多個任務,即只有工作者完成任務之后,才會再次接收到任務

channel.basic_qos(prefetch_count=1)

完整代碼如下:

receive1.py

 1 #!/usr/bin/env python3
 2 # -*- coding: utf-8 -*-
 3 import pika
 4 import time
 5 
 6 hostname = '192.168.1.133'
 7 parameters = pika.ConnectionParameters(hostname)
 8 connection = pika.BlockingConnection(parameters)
 9 
10 # 創建通道
11 channel = connection.channel()
12 # durable=True后將任務持久化存儲,防止任務丟失
13 channel.queue_declare(queue='task_queue', durable=True)
14 
15 
16 # ch.basic_ack為當工作者完成任務后,會反饋給rabbitmq
17 def callback(ch, method, properties, body):
18     print " [x] Received %r" % (body,)
19     time.sleep(5)
20     print " [x] Done"
21     ch.basic_ack(delivery_tag=method.delivery_tag)
22 
23 # basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工作者分配多個任務,
24 # 即只有工作者完成任務之后,才會再次接收到任務。
25 channel.basic_qos(prefetch_count=1)
26 
27 # 去除no_ack=True參數或者設置為False后可以實現
28 # 一個工作者ctrl+c退出后,正在執行的任務也不會丟失,rabbitmq會將任務重新分配給其他工作者。
29 channel.basic_consume(callback, queue='task_queue', no_ack=False)
30 # 開始接收信息,按ctrl+c退出
31 print ' [*] Waiting for messages. To exit press CTRL+C'
32 channel.start_consuming()

send1.py

 1 #!/usr/bin/env python3
 2 # -*- coding: utf-8 -*-
 3 import pika
 4 import random
 5 
 6 hostname = '192.168.1.133'
 7 parameters = pika.ConnectionParameters(hostname)
 8 connection = pika.BlockingConnection(parameters)
 9 
10 # 創建通道
11 channel = connection.channel()
12 # 如果rabbitmq自身掛掉的話,那么任務會丟失。所以需要將任務持久化存儲起來,聲明持久化存儲:
13 channel.queue_declare(queue='task_queue', durable=True)
14 
15 number = random.randint(1, 1000)
16 message = 'hello world:%s' % number
17 
18 # 在發送任務的時候,用delivery_mode=2來標記任務為持久化存儲:
19 channel.basic_publish(exchange='',
20                       routing_key='task_queue',
21                       body=message,
22                       properties=pika.BasicProperties(
23                           delivery_mode=2,
24                       ))
25 print " [x] Sent %r" % (message,)
26 connection.close()

示例如下:

首先啟動三個終端,兩個先執行receive1.py,第三個多次執行rend1.py:

終端3:

此時分配三個任務,33分配給worker1,170分配給worker2,262分配給worker1

終端1:

worker1完成任務33后,開始任務262,我們在任務完成前使用(CRTL+C)使worker1掛掉

終端2:

worker2完成任務170,本來沒有任務,但是worker1掛掉,此時接收他的任務262

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM