消費方法
Basic.Get
- 每次接收消息必須發送一次請求
- 有消息可用,RabbitMQ返回Basic.GetOk以及消息
- 無消息可用,RabbitMQ返回Basic.GetEmpty 應用程序需要評估RPC響應以及是否接收到消息。
示例程序
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'test-messages')
queue.declare()
while True:
message = queue.get()
if message:
message.pprint()
# 確認消息
message.ack()
if message.body == 'stop':
break
Basic.Consume
- 消費者可用時,異步方式發送消息
- 應用程序自動接收消息,直到Basic.Cancel
- 仍然需要確認消息
示例程序
import rabbitpy
for message in rabbitpy.consume('amqp://guest:guest@localhost:5672/%2f',
'test-messages'):
message.pprint()
# 消息確認
message.ack()
消費者標簽
應用程序發出Basic.Comsume時,創建唯一字符串(消費者標簽),標識應用程序。RabbitMQ每次都會把該字符串和消息一同發送給應用程序。
客戶端庫對消費者標簽封裝,以確定如何處理消息。開發者不用處理消費者標簽。
示例代碼:監聽消息直到,收到停止消息
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
for message in rabbitpy.Queue(channel, 'test-messages'):
message.pprint()
message.ack()
if message.body == 'stop':
break
對比
Consume吞吐量更大。Get包含了每條消息的同步通信開銷。
消費性能優化
1、no-ack
應用程序發送Basic.Comsume請求時,設置no-ack。表明消費者不進行消費確認。
示例代碼:消費不確認
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'test-messages')
for message in queue.consume_messages(no_ack=True):
message.pprint()
2、預取
QoS(Quality of service)中,可設置消費者預先接收一定數量的消息。Basic.Qos一般在Basic.Consume之前設置。
示例程序:指定QoS
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
#預取數為10
channel.prefetch_count(10)
for message in rabbitpy.Queue(channel, 'test-messages'):
message.pprint()
message.ack()
應用程序不需要確認每條消息,可確認所有以前未讀消息。
示例程序:多消息確認
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
channel.prefetch_count(10)
for message in rabbitpy.Queue(channel, 'test-messages'):
message.pprint()
unacknowledged += 1
if unacknowledged == 10:
# 確認所有未確認消息
message.ack(all_previous=True)
unacknowledged = 0
3、事務
事務允許消費者應用程序提交和回滾批量操作。不適用QoS時,可以獲得輕微的性能提升。
拒絕消息
Basic.Reject
通知rabbitmq無法處理投遞的消息(拒絕一個消息),可指示rabbitMQ丟棄消息或使用requeue重發消息。
示例程序:消息拒絕
import rabbitpy
for message in rabbitpy.consume('amqp://guest:guest@localhost:5672/%2f',
'test-messages'):
message.pprint()
print('Redelivered: %s' % message.redelivered)
message.reject(True)
Basic.Nack
同時拒絕多個消息
死信交換器(DLX)
創建隊列時聲明該交換器用於保存被拒絕的消息。隊列x-dead-letter-exchange參數(RPC請求)指定死信交換器。
示例程序:指定死信交換器
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
#死信交換器
rabbitpy.Exchange(channel, 'rejected-messages').declare()
queue = rabbitpy.Queue(channel, 'dlx-example',
dead_letter_exchange='rejected-messages')
queue.declare()
控制隊列
臨時隊列
自動刪除隊列
消費者完成連接和檢索消息,所有消費者斷開連接時,隊列將被刪除。
示例程序:自動刪除隊列auto_delete=True
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'ad-example', auto_delete=True)
queue.declare()
只允許單個消費者
只有單個消費者能夠消費隊列中的消息。消費者斷開連接后,會自動刪除隊列。
示例程序:獨占隊列exclusive
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'exclusive-example',
exclusive=True)
queue.declare()
自動過期隊列
如果一段時間沒有使用該隊列就刪除它,一般用於RPC回復隊列。
示例程序:自動過期隊列
import rabbitpy
import time
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'expiring-queue',
arguments={'x-expires': 1000})
queue.declare()
messages, consumers = queue.declare(passive=True)
time.sleep(2)
try:
messages, consumers = queue.declare(passive=True)
except rabbitpy.exceptions.AMQPNotFound:
print('The queue no longer exists')
永久隊列
隊列持久性
服務器重啟后隊列仍然存在。
示例程序:持久隊列
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'durable-queue',
durable=True)
if queue.declare():
print('Queue declared')
隊列消息自動過期
同時指定死信交換器和消息TTL,過期消息將成為死信消息。
示例程序:消息TTL
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'expiring-msg-queue',
arguments={'x-message-ttl': 1000})
queue.declare()
最大隊列長度
一旦達到最大值,添加新消息時,刪除隊列前端的消息。聲明隊列時,如果指定死信交換器,前端移除的消息將成為死信。
示例程序:最大長度隊列
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'max-length-queue',
arguments={'x-max-length': 1000})
queue.declare()
隊列設置參數
參數 | 說明 |
---|---|
x-dead-letter-exchange | 死信交換器,路由不重發且被拒絕的消息 |
x-dead-letter-routing-key | 死信消息的可選路由鍵 |
x-expires | 隊列在指定的毫秒數后刪除 |
x-ha-proxy | 創建HA隊列 |
x-ha-nodes | HA隊列分布的節點 |
x-max-length | 隊列的最大消息數 |
x-message-ttl | 毫秒為單位的隊列過期時間 |
x-max-priority | 隊列優先級排序 |