RabbitMQ 任務分發機制


   在上篇文章中,我們解決了從發送端(Producer)向接收端(Consumer)發送“Hello World”的問題。在實際的應用場景中,這是遠遠不夠的。從本篇文章開始,我們將結合更加實際的應用場景來講解更多的高級用法。

   當有Consumer需要大量的運算時,RabbitMQ Server需要一定的分發機制來balance每個Consumer的load。試想一下,對於web application來說,在一個很多的HTTP request里是沒有時間來處理復雜的運算的,只能通過后台的一些工作線程來完成。接下來我們分布講解。 

   應用場景就是RabbitMQ Server會將queue的Message分發給不同的Consumer以處理計算密集型的任務:



 

1. 准備

 

   在上一篇文章中,我們簡單在Message中包含了一個字符串"Hello World"。現在為了是Consumer做的是計算密集型的工作,那就不能簡單的字符串了。在現實應用中,Consumer有可能做的是一個圖片的resize,或者是pdf文件的渲染或者內容提取。但是作為Demo,還是用字符串模擬吧:通過字符串中的.的數量來決定計算的復雜度,每個.都會消耗1s,即sleep(1)。

    還是復用上篇文章中的code,根據“計算密集型”做一下簡單的修改,為了辨別,我們把send.py 的名字換成new_task.py

 

[python] view plain copy
 
print?
  1. import sys  
  2.   
  3. message = ' '.join(sys.argv[1:]) or "Hello World!"  
  4. channel.basic_publish(exchange='',  
  5.                       routing_key='hello',  
  6.                       body=message)  
  7. print " [x] Sent %r" % (message,)  
import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print " [x] Sent %r" % (message,)

同樣的道理,把receive.py的名字換成worker.py,並且根據Message中的.的數量進行計算密集型模擬:

 

 

[python] view plain copy
 
print?
  1. import time  
  2.   
  3. def callback(ch, method, properties, body):  
  4.     print " [x] Received %r" % (body,)  
  5.     time.sleep( body.count('.') )  
  6.     print " [x] Done"  
import time

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"

 

 

2. Round-robin dispatching 循環分發

 

        RabbitMQ的分發機制非常適合擴展,而且它是專門為並發程序設計的。如果現在load加重,那么只需要創建更多的Consumer來進行任務處理即可。當然了,對於負載還要加大怎么辦?我沒有遇到過這種情況,那就可以創建多個virtual Host,細化不同的通信類別了。

     首先開啟兩個Consumer,即運行兩個worker.py。

Console1:

 

[python] view plain copy
 
print?
  1. shell1$ python worker.py  
  2.  [*] Waiting for messages. To exit press CTRL+C  
shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

Consule2:

 

 

[python] view plain copy
 
print?
  1. shell2$ python worker.py  
  2.  [*] Waiting for messages. To exit press CTRL+C  
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

Producer new_task.py要Publish Message了:

 

 

[python] view plain copy
 
print?
  1. shell3$ python new_task.py First message.  
  2. shell3$ python new_task.py Second message..  
  3. shell3$ python new_task.py Third message...  
  4. shell3$ python new_task.py Fourth message....  
  5. shell3$ python new_task.py Fifth message.....  
shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

注意一下:.代表的sleep(1)。接着開一下Consumer worker.py收到了什么:

 

Console1:

 

[python] view plain copy
 
print?
  1. shell1$ python worker.py  
  2.  [*] Waiting for messages. To exit press CTRL+C  
  3.  [x] Received 'First message.'  
  4.  [x] Received 'Third message...'  
  5.  [x] Received 'Fifth message.....'  
shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

Console2:

 

 

[python] view plain copy
 
print?
  1. shell2$ python worker.py  
  2.  [*] Waiting for messages. To exit press CTRL+C  
  3.  [x] Received 'Second message..'  
  4.  [x] Received 'Fourth message....'  
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

默認情況下,RabbitMQ 會順序的分發每個Message。當每個收到ack后,會將該Message刪除,然后將下一個Message分發到下一個Consumer。這種分發方式叫做round-robin。這種分發還有問題,接着向下讀吧。

 

 

3. Message acknowledgment 消息確認

 

      每個Consumer可能需要一段時間才能處理完收到的數據。如果在這個過程中,Consumer出錯了,異常退出了,而數據還沒有處理完成,那么非常不幸,這段數據就丟失了。因為我們采用no-ack的方式進行確認,也就是說,每次Consumer接到數據后,而不管是否處理完成,RabbitMQ Server會立即把這個Message標記為完成,然后從queue中刪除了。

     如果一個Consumer異常退出了,它處理的數據能夠被另外的Consumer處理,這樣數據在這種情況下就不會丟失了(注意是這種情況下)。

      為了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。為了保證數據能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應該是在處理完數據后發送ack。

    在處理數據后發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ可以去安全的刪除它了。

    如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數據也不會丟失。

    這里並沒有用到超時機制。RabbitMQ僅僅通過Consumer的連接中斷來確認該Message並沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做數據處理。

    默認情況下,消息確認是打開的(enabled)。在上篇文章中我們通過no_ack = True 關閉了ack。重新修改一下callback,以在消息處理完成后發送ack:

 

[python] view plain copy
 
print?
  1. def callback(ch, method, properties, body):  
  2.     print " [x] Received %r" % (body,)  
  3.     time.sleep( body.count('.') )  
  4.     print " [x] Done"  
  5.     ch.basic_ack(delivery_tag = method.delivery_tag)  
  6.   
  7. channel.basic_consume(callback,  
  8.                       queue='hello')  
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')

     這樣即使你通過Ctr-C中斷了worker.py,那么Message也不會丟失了,它會被分發到下一個Consumer。

 

      如果忘記了ack,那么后果很嚴重。當Consumer退出時,Message會重新分發。然后RabbitMQ會占用越來越多的內存,由於RabbitMQ會長時間運行,因此這個“內存泄漏”是致命的。去調試這種錯誤,可以通過一下命令打印un-acked Messages:

 

[python] view plain copy
 
print?
  1. $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged  
  2. Listing queues ...  
  3. hello    0       0  
  4. ...done.  
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

 

 

4. Message durability消息持久化

 

     在上一節中我們知道了即使Consumer異常退出,Message也不會丟失。但是如果RabbitMQ Server退出呢?軟件都有bug,即使RabbitMQ Server是完美毫無bug的(當然這是不可能的,是軟件就有bug,沒有bug的那不叫軟件),它還是有可能退出的:被其它軟件影響,或者系統重啟了,系統panic了。。。

    為了保證在RabbitMQ退出或者crash了數據仍沒有丟失,需要將queue和Message都要持久化。

queue的持久化需要在聲明時指定durable=True:

 

[python] view plain copy
 
print?
  1. channel.queue_declare(queue='hello', durable=True)  
channel.queue_declare(queue='hello', durable=True)

上述語句執行不會有什么錯誤,但是確得不到我們想要的結果,原因就是RabbitMQ Server已經維護了一個叫hello的queue,那么上述執行不會有任何的作用,也就是hello的任何屬性都不會被影響。這一點在上篇文章也討論過。

 

那么workaround也很簡單,聲明一個另外的名字的queue,比如名字定位task_queue:

 

[python] view plain copy
 
print?
  1. channel.queue_declare(queue='task_queue', durable=True)  
channel.queue_declare(queue='task_queue', durable=True)

再次強調,Producer和Consumer都應該去創建這個queue,盡管只有一個地方的創建是真正起作用的:

 

接下來,需要持久化Message,即在Publish的時候指定一個properties,方式如下:

 

[python] view plain copy
 
print?
  1. channel.basic_publish(exchange='',  
  2.                       routing_key="task_queue",  
  3.                       body=message,  
  4.                       properties=pika.BasicProperties(  
  5.                          delivery_mode = 2, # make message persistent  
  6.                       ))  
channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

關於持久化的進一步討論:

 

    為了數據不丟失,我們采用了:

  1. 在數據處理結束后發送ack,這樣RabbitMQ Server會認為Message Deliver 成功。
  2. 持久化queue,可以防止RabbitMQ Server 重啟或者crash引起的數據丟失。
  3. 持久化Message,理由同上。

    但是這樣能保證數據100%不丟失嗎?

    答案是否定的。問題就在與RabbitMQ需要時間去把這些信息存到磁盤上,這個time window雖然短,但是它的確還是有。在這個時間窗口內如果數據沒有保存,數據還會丟失。還有另一個原因就是RabbitMQ並不是為每個Message都做fsync:它可能僅僅是把它保存到Cache里,還沒來得及保存到物理磁盤上。

    因此這個持久化還是有問題。但是對於大多數應用來說,這已經足夠了。當然為了保持一致性,你可以把每次的publish放到一個transaction中。這個transaction的實現需要user defined codes。

    那么商業系統會做什么呢?一種可能的方案是在系統panic時或者異常重啟時或者斷電時,應該給各個應用留出時間去flash cache,保證每個應用都能exit gracefully。

 

5. Fair dispatch 公平分發

 

    你可能也注意到了,分發機制不是那么優雅。默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。當然n是取余后的。它不管Consumer是否還有unacked Message,只是按照這個默認機制進行分發。

   那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻是毫無休息的機會。那么,RabbitMQ是如何處理這種問題呢?

 

  通過 basic.qos 方法設置prefetch_count=1 。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。 設置方法如下:

 

[python] view plain copy
 
print?
  1. channel.basic_qos(prefetch_count=1)  
channel.basic_qos(prefetch_count=1)

注意,這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtualHost來細化你的設計。

 

 

6. 最終版本

new_task.py script:

 

[python] view plain copy
 
print?
  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4.   
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8.   
  9. channel.queue_declare(queue='task_queue', durable=True)  
  10.   
  11. message = ' '.join(sys.argv[1:]) or "Hello World!"  
  12. channel.basic_publish(exchange='',  
  13.                       routing_key='task_queue',  
  14.                       body=message,  
  15.                       properties=pika.BasicProperties(  
  16.                          delivery_mode = 2, # make message persistent  
  17.                       ))  
  18. print " [x] Sent %r" % (message,)  
  19. connection.close()  
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()

 

worker.py script:

 

[python] view plain copy
 
print?
  1. #!/usr/bin/env python  
  2. import pika  
  3. import time  
  4.   
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8.   
  9. channel.queue_declare(queue='task_queue', durable=True)  
  10. print ' [*] Waiting for messages. To exit press CTRL+C'  
  11.   
  12. def callback(ch, method, properties, body):  
  13.     print " [x] Received %r" % (body,)  
  14.     time.sleep( body.count('.') )  
  15.     print " [x] Done"  
  16.     ch.basic_ack(delivery_tag = method.delivery_tag)  
  17.   
  18. channel.basic_qos(prefetch_count=1)  
  19. channel.basic_consume(callback,  
  20.                       queue='task_queue')  
  21.   
  22. channel.start_consuming()  
#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

 

 

轉載自:anzhsoft  http://blog.csdn.net/anzhsoft/article/details/19607841

          

參考資料:

1. http://www.rabbitmq.com/tutorials/tutorial-two-python.html                                   

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM