python之rabbitMQ二:隊列、消息持久化


一、隊列持久化

 聲明隊列queue_declare方法的原型 :

channel.queue_declare(queue='', passive=False, durable=False,
                      exclusive=False, auto_delete=False, arguments=None):

queue: 隊列名稱

durable: 是否持久化, 隊列的聲明默認是False,即存放到內存中的,如果rabbitmq重啟會丟失。

  如果想重啟之后還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫。

exclusive:是否排外的,默認為False,不排外。有兩個作用:

  一:當連接關閉時connection.close()該隊列是否會自動刪除;

  二:該隊列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個隊列,沒有任何問題;

  如果是排外的,會對當前隊列加鎖,只允許當前消費者可以訪問,其他通道channel是不能訪問的,如果強制訪問會報異常:ShutdownSignalException: channel error

  一般等於true的話用於一個隊列只能有一個消費者來消費的場景 。

auto_delete:是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除,默認為False。

  可以通過RabbitMQ Management,查看某個隊列的消費者數量,當consumers = 0時,即沒有任務消費者時,隊列就會自動刪除

arguments: 
隊列中的消息什么時候會自動被刪除?
Message TTL(x-message-ttl):設置隊列中的所有消息的生存周期(統一為整個隊列的所有消息設置生命周期),

也可以在發布消息的時候單獨為某個消息指定剩余生存時間,單位毫秒, 類似於redis中的ttl,生存時間到了,消息會被從隊里中刪除,注意是消息被刪除,而不是隊列被刪除, 特性Features=TTL, 單獨為某條消息設置過期時間:properties=pika.BasicProperties(.........)

 


Auto Expire(x-expires): 當隊列在指定的時間沒有被訪問(consume, basicGet, queueDeclare…)就會被刪除,Features=Exp

Max Length(x-max-length): 限定隊列的消息的最大值長度,超過指定長度將會把最早的幾條刪除掉, 類似於mongodb中的固定集合,例如保存最新的100條消息, Feature=Lim

Max Length Bytes(x-max-length-bytes): 限定隊列最大占用的空間大小, 一般受限於內存、磁盤的大小, Features=Lim B

Dead letter exchange(x-dead-letter-exchange): 當隊列消息長度大於最大長度、或者過期的等,將從隊列中刪除的消息推送到指定的交換機中去而不是丟棄掉,Features=DLX

Dead letter routing key(x-dead-letter-routing-key):將刪除的消息推送到指定交換機的指定路由鍵的隊列中去, Feature=DLK

Maximum priority(x-max-priority):優先級隊列,聲明隊列時先定義最大優先級值(定義最大值一般不要太大),在發布消息的時候指定該消息的優先級, 優先級更高(數值更大的)的消息先被消費,

Lazy mode(x-queue-mode=lazy): Lazy Queues: 先將消息保存到磁盤上,不放在內存中,當消費者開始消費的時候才加載到內存中
Master locator(x-queue-master-locator) 

關於隊列的聲明,如果使用同一套參數進行聲明了,就不能再使用其他參數來聲明,要么刪除該隊列重新刪除,可以使用命令行刪除也可以在RabbitMQ Management上刪除,要么給隊列重新起一個名字。

隊列持久化:

重啟RabbitMQ服務器(可以通過rabbitmqctl stop_app關閉服務器,rabbitmqctl start_app重啟服務器),可以登錄RabbitMQ Management—> Queues中。如果隊列設置了持久化,則可以看到之前聲明的隊列還存在。

 

二、消息持久化

消息確認機制:

如果消費消息時發生異常,隊列中也沒有了消息,服務器無法知道此消息是否成功,此時將發生丟失嗎?

不會,因為有消息確認機制:事實上,此條消息被消費者取出,隊列中沒有了此消息(但會暫時保存在其它地方)。但是rabbitMQ有消息消費成功與否的確認機制,如果消息異常,即失敗,此條消息將從另外一個地方重新放回隊列中;如果成功才會根據配置在多長時間內刪除這條消息。

消息持久化:

如果消息服務器宕機,服務器中的隊列和消息是否會被保存?

如果沒有啟用消息持久化(默認值),消息是保存在內存中的,宕機將丟失隊列 和消息。

如果設置了隊列、消息持久化,則會保存在erlang自帶的數據庫中,重啟服務器后將恢復隊列和消息。

 設置消息持久化必須先設置隊列持久化,要不然隊列不持久化,消息持久化,隊列都不存在了,消息存在還有什么意義。消息持久化需要將交換機持久化、隊列持久化、消息持久化,才能最終達到持久化的目的。

為單條消息設置持久化:發布消息時,設置參數properties=pika.BasicProperties(delivery_mode=2),2為持久化,1為非持久化。

channel.basic_publish(exchange='',
                      routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))

Message TTL消息剩余生存時間
為該隊列的所有消息統一設置相同的聲明周期:統一設置隊列中的所有消息的過期時間,例如設置10秒,10秒后這個隊列的消息清零

arguments.put("x-message-ttl", 10000);

// 聲明隊列時指定隊列中的消息過期時間 channel.queue_declare(QUEUE_NAME, false, false, false, arguments); 

Auto Expire自動過期
x-expires用於當多長時間沒有消費者訪問該隊列的時候,該隊列會自動刪除,可以設置一個延遲時間,如僅啟動一個生產者,10秒之后該隊列會刪除,或者啟動一個生產者,再啟動一個消費者,消費者運行結束后10秒,隊列也會被刪除

Max Length最大長度
x-max-length:用於指定隊列的長度,如果不指定,可以認為是無限長,例如指定隊列的長度是4,當超過4條消息,前面的消息將被刪除,給后面的消息騰位

Max Length Bytes代碼片段
x-max-length-bytes: 用於指定隊列存儲消息的占用空間大小,當達到最大值是會刪除之前的數據騰出空間

Maximum priority最大優先級
x-max-priority: 設置消息的優先級,優先級值越大,越被提前消費。

正常情況下不適用優先級 
Hello RabbitMQ: 1 
Hello RabbitMQ: 2 
Hello RabbitMQ: 3 
Hello RabbitMQ: 4 
Hello RabbitMQ: 5

使用優先級順序正好相反 
Hello RabbitMQ: 5 
Hello RabbitMQ: 4 
Hello RabbitMQ: 3 
Hello RabbitMQ: 2 
Hello RabbitMQ: 1

 

三、示例 

一對一的生產者、消費者的消息隊列模式:

生產者:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 此消息持久化
                      ))
print(" [x] Sent %r" % message)
connection.close()

 

消費者:

#!/usr/bin/env python
import pika
import time


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    # 消息確認:消費者完成消費后,發送確認消息給服務器
    ch.basic_ack(delivery_tag=method.delivery_tag)


connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue',
                      # no_ack=False  # 默認為False
                      )

channel.start_consuming()

 

1.如果消費者,從隊列中取到消息,但消費失敗了?怎么保證此條消息會退還到隊列中,能夠被其它消息者獲取到?

消費者在消費成功時,發送消息確認即可;在代碼中,必須這兩個地方都要實現:

  • 在callback消費者函數中,發送確認消息:ch.basic_ack(delivery_tag=method.delivery_tag)
  • 消費都的channel.basic_consume中的no_ack參數使用默認值False

2.如果rabbitMQ服務器掛了,怎么保證在服務器重啟用,隊列中的消息不丟失?

隊列持久化,且消息持久化。必須兩者都持久化。代碼:

  • 在聲明隊列時,使用參數durable=True使隊列持久化:
    • channel.queue_declare(queue='task_queue', durable=True)
  • 設置隊列的Message TTL消息剩余生存時間
  • 或者生產者在發布消息時,使用參數properties=pika.BasicProperties(delivery_mode=2)使消息持久化:channel.basic_publish(exchange='',
 routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 此消息持久化  )) 

四、公平派遣

您可能已經注意到調度仍然無法完全按照我們的要求工作。例如,在有兩名工人的情況下,當所有奇怪的信息都很重,甚至信息很少時,一名工作人員會一直很忙, 另一名工作人員幾乎不會做任何工作。那么,RabbitMQ不知道任何有關這一點,並仍將均勻地發送消息。

發生這種情況是因為RabbitMQ只在消息進入隊列時調度消息。它沒有考慮消費者未確認消息的數量。它只是盲目地將第n條消息分發給第n位消費者。

為了解決這個問題,我們可以使用basic.qos方法和設置prefetch_count = 1。這告訴RabbitMQ一次不要向工作人員發送多個消息。 或者換句話說,不要向工作人員發送新消息,直到它處理並確認了前一個消息。相反,它會將其分派給不是仍然忙碌的下一個工作人員。

channel.basic_qos(prefetch_count=1)

 

示例:

生產者:new_task.py

#!/usr/bin/env python
import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or 'Hello World'

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 確保消息是持久的
                      ))
print(" [x] Sent %r" % message)
connection.close()

 

消費者:worker.py

#!/usr/bin/env python
import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback,
                      queue='hello')
channel.basic_qos(prefetch_count=1)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

使用消息確認和prefetch_count,您可以設置一個工作隊列。即使RabbitMQ重新啟動,持久性選項也可讓任務繼續存在。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM