一、隊列持久化
聲明隊列queue_declare方法的原型 :
channel.queue_declare(queue='', passive=False, durable=False,
exclusive=False, auto_delete=False, arguments=None):
queue: 隊列名稱
durable: 是否持久化, 隊列的聲明默認是False,即存放到內存中的,如果rabbitmq重啟會丟失。
如果想重啟之后還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫。
exclusive:是否排外的,默認為False,不排外。有兩個作用:
一:當連接關閉時connection.close()該隊列是否會自動刪除;
二:該隊列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個隊列,沒有任何問題;
如果是排外的,會對當前隊列加鎖,只允許當前消費者可以訪問,其他通道channel是不能訪問的,如果強制訪問會報異常:ShutdownSignalException: channel error
一般等於true的話用於一個隊列只能有一個消費者來消費的場景 。
auto_delete:是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除,默認為False。
可以通過RabbitMQ Management,查看某個隊列的消費者數量,當consumers = 0時,即沒有任務消費者時,隊列就會自動刪除
arguments:
隊列中的消息什么時候會自動被刪除?
Message TTL(x-message-ttl):設置隊列中的所有消息的生存周期(統一為整個隊列的所有消息設置生命周期),
也可以在發布消息的時候單獨為某個消息指定剩余生存時間,單位毫秒, 類似於redis中的ttl,生存時間到了,消息會被從隊里中刪除,注意是消息被刪除,而不是隊列被刪除, 特性Features=TTL, 單獨為某條消息設置過期時間:properties=pika.BasicProperties(.........)
Auto Expire(x-expires): 當隊列在指定的時間沒有被訪問(consume, basicGet, queueDeclare…)就會被刪除,Features=Exp
Max Length(x-max-length): 限定隊列的消息的最大值長度,超過指定長度將會把最早的幾條刪除掉, 類似於mongodb中的固定集合,例如保存最新的100條消息, Feature=Lim
Max Length Bytes(x-max-length-bytes): 限定隊列最大占用的空間大小, 一般受限於內存、磁盤的大小, Features=Lim B
Dead letter exchange(x-dead-letter-exchange): 當隊列消息長度大於最大長度、或者過期的等,將從隊列中刪除的消息推送到指定的交換機中去而不是丟棄掉,Features=DLX
Dead letter routing key(x-dead-letter-routing-key):將刪除的消息推送到指定交換機的指定路由鍵的隊列中去, Feature=DLK
Maximum priority(x-max-priority):優先級隊列,聲明隊列時先定義最大優先級值(定義最大值一般不要太大),在發布消息的時候指定該消息的優先級, 優先級更高(數值更大的)的消息先被消費,
Lazy mode(x-queue-mode=lazy): Lazy Queues: 先將消息保存到磁盤上,不放在內存中,當消費者開始消費的時候才加載到內存中
Master locator(x-queue-master-locator)
關於隊列的聲明,如果使用同一套參數進行聲明了,就不能再使用其他參數來聲明,要么刪除該隊列重新刪除,可以使用命令行刪除也可以在RabbitMQ Management上刪除,要么給隊列重新起一個名字。
隊列持久化:
重啟RabbitMQ服務器(可以通過rabbitmqctl stop_app關閉服務器,rabbitmqctl start_app重啟服務器),可以登錄RabbitMQ Management—> Queues中。如果隊列設置了持久化,則可以看到之前聲明的隊列還存在。
二、消息持久化
消息確認機制:
如果消費消息時發生異常,隊列中也沒有了消息,服務器無法知道此消息是否成功,此時將發生丟失嗎?
不會,因為有消息確認機制:事實上,此條消息被消費者取出,隊列中沒有了此消息(但會暫時保存在其它地方)。但是rabbitMQ有消息消費成功與否的確認機制,如果消息異常,即失敗,此條消息將從另外一個地方重新放回隊列中;如果成功才會根據配置在多長時間內刪除這條消息。
消息持久化:
如果消息服務器宕機,服務器中的隊列和消息是否會被保存?
如果沒有啟用消息持久化(默認值),消息是保存在內存中的,宕機將丟失隊列 和消息。
如果設置了隊列、消息持久化,則會保存在erlang自帶的數據庫中,重啟服務器后將恢復隊列和消息。
設置消息持久化必須先設置隊列持久化,要不然隊列不持久化,消息持久化,隊列都不存在了,消息存在還有什么意義。消息持久化需要將交換機持久化、隊列持久化、消息持久化,才能最終達到持久化的目的。
為單條消息設置持久化:發布消息時,設置參數properties=pika.BasicProperties(delivery_mode=2),2為持久化,1為非持久化。
channel.basic_publish(exchange='',
routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
Message TTL消息剩余生存時間
為該隊列的所有消息統一設置相同的聲明周期:統一設置隊列中的所有消息的過期時間,例如設置10秒,10秒后這個隊列的消息清零
arguments.put("x-message-ttl", 10000);
// 聲明隊列時指定隊列中的消息過期時間 channel.queue_declare(QUEUE_NAME, false, false, false, arguments);
Auto Expire自動過期
x-expires用於當多長時間沒有消費者訪問該隊列的時候,該隊列會自動刪除,可以設置一個延遲時間,如僅啟動一個生產者,10秒之后該隊列會刪除,或者啟動一個生產者,再啟動一個消費者,消費者運行結束后10秒,隊列也會被刪除
Max Length最大長度
x-max-length:用於指定隊列的長度,如果不指定,可以認為是無限長,例如指定隊列的長度是4,當超過4條消息,前面的消息將被刪除,給后面的消息騰位
Max Length Bytes代碼片段
x-max-length-bytes: 用於指定隊列存儲消息的占用空間大小,當達到最大值是會刪除之前的數據騰出空間
Maximum priority最大優先級
x-max-priority: 設置消息的優先級,優先級值越大,越被提前消費。
正常情況下不適用優先級
Hello RabbitMQ: 1
Hello RabbitMQ: 2
Hello RabbitMQ: 3
Hello RabbitMQ: 4
Hello RabbitMQ: 5
使用優先級順序正好相反
Hello RabbitMQ: 5
Hello RabbitMQ: 4
Hello RabbitMQ: 3
Hello RabbitMQ: 2
Hello RabbitMQ: 1
三、示例
一對一的生產者、消費者的消息隊列模式:
生產者:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 此消息持久化 )) print(" [x] Sent %r" % message) connection.close()
消費者:
#!/usr/bin/env python import pika import time def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") # 消息確認:消費者完成消費后,發送確認消息給服務器 ch.basic_ack(delivery_tag=method.delivery_tag) connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue', # no_ack=False # 默認為False ) channel.start_consuming()
1.如果消費者,從隊列中取到消息,但消費失敗了?怎么保證此條消息會退還到隊列中,能夠被其它消息者獲取到?
消費者在消費成功時,發送消息確認即可;在代碼中,必須這兩個地方都要實現:
- 在callback消費者函數中,發送確認消息:ch.basic_ack(delivery_tag=method.delivery_tag)
- 消費都的channel.basic_consume中的no_ack參數使用默認值False
2.如果rabbitMQ服務器掛了,怎么保證在服務器重啟用,隊列中的消息不丟失?
隊列持久化,且消息持久化。必須兩者都持久化。代碼:
- 在聲明隊列時,使用參數durable=True使隊列持久化:
- channel.queue_declare(queue='task_queue', durable=True)
- 設置隊列的Message TTL消息剩余生存時間
- 或者生產者在發布消息時,使用參數properties=pika.BasicProperties(delivery_mode=2)使消息持久化:channel.basic_publish(exchange='',
routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 此消息持久化 ))
四、公平派遣
您可能已經注意到調度仍然無法完全按照我們的要求工作。例如,在有兩名工人的情況下,當所有奇怪的信息都很重,甚至信息很少時,一名工作人員會一直很忙, 另一名工作人員幾乎不會做任何工作。那么,RabbitMQ不知道任何有關這一點,並仍將均勻地發送消息。
發生這種情況是因為RabbitMQ只在消息進入隊列時調度消息。它沒有考慮消費者未確認消息的數量。它只是盲目地將第n條消息分發給第n位消費者。
為了解決這個問題,我們可以使用basic.qos方法和設置prefetch_count = 1。這告訴RabbitMQ一次不要向工作人員發送多個消息。 或者換句話說,不要向工作人員發送新消息,直到它處理並確認了前一個消息。相反,它會將其分派給不是仍然忙碌的下一個工作人員。
channel.basic_qos(prefetch_count=1)
示例:
生產者:new_task.py
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or 'Hello World' channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 確保消息是持久的 )) print(" [x] Sent %r" % message) connection.close()
消費者:worker.py
#!/usr/bin/env python import time import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='hello') channel.basic_qos(prefetch_count=1) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
使用消息確認和prefetch_count,您可以設置一個工作隊列。即使RabbitMQ重新啟動,持久性選項也可讓任務繼續存在。