https://blog.csdn.net/appleyuchi/article/details/79190113
隊列和消息是兩個概念?
假如消息隊列test里面還有消息等待消費者(consumers)去接收,但是這個時候服務器端宕機了,這個時候消息是否還在?
1、隊列消息非持久化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import
pika
# 聲明一個socket 實例
connect
=
pika.BlockingConnection(pika.ConnectionParameters(
"localhost"
))
# 聲明一個管道
channel
=
connect.channel()
# 聲明queue名稱為test
channel.queue_declare(queue
=
"test"
)
#RabbitMQ的消息永遠不會被直接發送到隊列中,它總是需要經過一次交換
channel.basic_publish(exchange
=
'',
routing_key
=
"test"
,
body
=
"hello word"
)
print
(
"Sent 'hello world'"
)
connect.close()
channel.queue_declare(queue
=
"test"
)
①隊列持久化很簡單,只需要在服務端(produce)聲明queue的時候添加一個參數:
channel.queue_declare(queue
=
'shuaigaogao'
, durable
=
True
)
# durable=True 持久化
channel.basic_publish(exchange
=
"",
routing_key
=
"shuaigaogao"
,
#queue的名字
body
=
"hello world"
,
#body是要發送的內容
properties
=
pika.BasicProperties(delivery_mode
=
2
,)
# make message persistent=>使消息持久化的特性
)
1
channel.queue_declare(queue
=
'shuaigaogao'
, durable
=
True
)
小結:
RabbitMQ在服務端沒有聲明隊列和消息持久化時,隊列和消息是存在內存中的,服務端宕機了,隊列和消息也不會保留。
- 服務端聲明持久化,客戶端想接受消息的話,必須也要聲明queue時,也要聲明持久化,不然的話,客戶端執行會報錯。
以上兩句是整篇文章的重中之重!!!
RabbitMQ 消息公平分發
可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
channel.basic_qos(prefetch_count
=
1
)
服務端:
import
pika
# 聲明一個socket 實例
connect
=
pika.BlockingConnection(pika.ConnectionParameters(
"localhost"
))
# 聲明一個管道
channel
=
connect.channel()
# 聲明queue名稱為test
channel.queue_declare(queue
=
"test"
, durable
=
True
)
# 隊列持久化
#RabbitMQ的消息永遠不會被直接發送到隊列中,它總是需要經過一次交換
channel.basic_publish(exchange
=
'',
routing_key
=
"test"
,
body
=
"hello word"
,
properties
=
pika.BasicProperties(delivery_mode
=
2
,))
# 消息持久化
print
(
"Sent 'hello world'"
)
connect.close()
import
pika
import
time
# 聲明socket實例
connect
=
pika.BlockingConnection(pika.ConnectionParameters(
"localhost"
))
# 聲明一個管道 雖然在之前的produce代碼中聲明過一次管道,
# 但是在不知道produce中的管道是否運行之前(如果未運行,consumers中也不聲明的話就會報錯),
# 在consumers中也聲明一次是一種正確的做法
channel
=
connect.channel()
#聲明queue
channel.queue_declare(queue
=
"test"
, durable
=
True
)
#回調函數
def
callback(ch, method, properites, body):
time.sleep(
30
)
print
(
"-----"
, ch, method, properites, body)
print
(
"Received %r"
%
body)
ch.basic_ack(delivery_tag
=
method.delivery_tag)
# 手動確認收到消息,添加手動確認時,no_ack必須為False,不然就會報錯
channel.basic_qos(prefetch_count
=
1
)
# 在消息消費之前加上消息處理配置
channel.basic_consume(callback,
queue
=
"test"
,
no_ack
=
False
)
print
(
"Waiting for messages"
)
#這個start只要一啟動,就一直運行,它不止收一條,而是永遠收下去,沒有消息就在這邊卡住
channel.start_consuming()