RabbitMQ 隊列消息持久化
假如消息隊列test里面還有消息等待消費者(consumers)去接收,但是這個時候服務器端宕機了,這個時候消息是否還在?
1、隊列消息非持久化
服務端(producer):
import pika # 聲明一個socket 實例 connect = pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 聲明一個管道 channel = connect.channel() # 聲明queue名稱為test channel.queue_declare(queue="test") #RabbitMQ的消息永遠不會被直接發送到隊列中,它總是需要經過一次交換 channel.basic_publish(exchange='', routing_key="test", body="hello word") print("Sent 'hello world'") connect.close()
客戶端(consumers):
import pika import time # 聲明socket實例 connect = pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 聲明一個管道 雖然在之前的produce代碼中聲明過一次管道, # 但是在不知道produce中的管道是否運行之前(如果未運行,consumers中也不聲明的話就會報錯), # 在consumers中也聲明一次是一種正確的做法 channel = connect.channel() #聲明queue channel.queue_declare(queue="test") #回調函數 def callback(ch, method, properites, body): time.sleep(30) print("-----", ch, method, properites, body) print("Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 手動確認收到消息,添加手動確認時,no_ack必須為False,不然就會報錯 channel.basic_consume(callback, queue="test", no_ack=False) print("Waiting for messages") #這個start只要一啟動,就一直運行,它不止收一條,而是永遠收下去,沒有消息就在這邊卡住 channel.start_consuming()
上面的服務端和客戶端聲明queue的方式都是非持久的
channel.queue_declare(queue="test")
①服務端先發送往test隊列里發送兩條消息
②通過運行--services.msc進入服務重新啟動RabbitMQ
③再次查看消息隊列queue中的消息數量
通過小實驗可以看出,非持久聲明的queue,在服務端宕機后,消息隊列queue和消息都不復存在了
2、隊列消息持久化:
①隊列持久化很簡單,只需要在服務端(produce)聲明queue的時候添加一個參數:
channel.queue_declare(queue='shuaigaogao', durable=True) # durable=True 持久化
②僅僅持久化隊列是沒有意義的,還需要多消息進行持久化
channel.basic_publish(exchange="", routing_key="shuaigaogao", #queue的名字 body="hello world", #body是要發送的內容 properties=pika.BasicProperties(delivery_mode=2,) # make message persistent=>使消息持久化的特性 )
③最后一步,在服務端隊列消息都持久化了之后需要在客戶端聲明queue的時候也持久化
channel.queue_declare(queue='shuaigaogao', durable=True)
這樣就算再傳遞消息過程中,服務端的發生宕機,消息和隊列也不會丟失
小結:
- RabbitMQ在服務端沒有聲明隊列和消息持久化時,隊列和消息是存在內存中的,服務端宕機了,隊列和消息也不會保留。
- 服務端聲明持久化,客戶端想接受消息的話,必須也要聲明queue時,也要聲明持久化,不然的話,客戶端執行會報錯。
RabbitMQ 消息公平分發
如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
channel.basic_qos(prefetch_count=1)
通俗的講就是消費者有多大本事,就干多少活,消費者處理的越慢,其消息分配分發的就少,反之消費者消息處理的多,處理的快,就可以多向這個消費者分配一些消息。服務端給客戶端發消息的時候,先檢查一下,這個消費者現在還有多少消息,如果處理的消息超過1條,就不給這個消費者發送消息了
隊列消息持久化+公平分發示列:
服務端:
import pika # 聲明一個socket 實例 connect = pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 聲明一個管道 channel = connect.channel() # 聲明queue名稱為test channel.queue_declare(queue="test", durable=True) # 隊列持久化 #RabbitMQ的消息永遠不會被直接發送到隊列中,它總是需要經過一次交換 channel.basic_publish(exchange='', routing_key="test", body="hello word", properties=pika.BasicProperties(delivery_mode=2,)) # 消息持久化 print("Sent 'hello world'") connect.close()
客戶端:
import pika import time # 聲明socket實例 connect = pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 聲明一個管道 雖然在之前的produce代碼中聲明過一次管道, # 但是在不知道produce中的管道是否運行之前(如果未運行,consumers中也不聲明的話就會報錯), # 在consumers中也聲明一次是一種正確的做法 channel = connect.channel() #聲明queue channel.queue_declare(queue="test", durable=True) #回調函數 def callback(ch, method, properites, body): time.sleep(30) print("-----", ch, method, properites, body) print("Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 手動確認收到消息,添加手動確認時,no_ack必須為False,不然就會報錯 channel.basic_qos(prefetch_count=1) # 在消息消費之前加上消息處理配置 channel.basic_consume(callback, queue="test", no_ack=False) print("Waiting for messages") #這個start只要一啟動,就一直運行,它不止收一條,而是永遠收下去,沒有消息就在這邊卡住 channel.start_consuming()