rabbitmq消息消費




消費方法


Basic.Get

  • 每次接收消息必須發送一次請求
  • 有消息可用,RabbitMQ返回Basic.GetOk以及消息
  • 無消息可用,RabbitMQ返回Basic.GetEmpty 應用程序需要評估RPC響應以及是否接收到消息。

示例程序

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'test-messages')
        queue.declare()
        while True:
            message = queue.get()
            if message:
                message.pprint()
                # 確認消息
                message.ack()
                if message.body == 'stop':
                    break

Basic.Consume

  • 消費者可用時,異步方式發送消息
  • 應用程序自動接收消息,直到Basic.Cancel
  • 仍然需要確認消息

示例程序

import rabbitpy

for message in rabbitpy.consume('amqp://guest:guest@localhost:5672/%2f',
                                'test-messages'):
    message.pprint()
    # 消息確認
    message.ack()

消費者標簽
應用程序發出Basic.Comsume時,創建唯一字符串(消費者標簽),標識應用程序。RabbitMQ每次都會把該字符串和消息一同發送給應用程序。
客戶端庫對消費者標簽封裝,以確定如何處理消息。開發者不用處理消費者標簽。

示例代碼:監聽消息直到,收到停止消息

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        for message in rabbitpy.Queue(channel, 'test-messages'):
            message.pprint()
            message.ack()
            if message.body == 'stop':
                break

對比

Consume吞吐量更大。Get包含了每條消息的同步通信開銷。


消費性能優化


1、no-ack

應用程序發送Basic.Comsume請求時,設置no-ack。表明消費者不進行消費確認。

示例代碼:消費不確認

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'test-messages')
        for message in queue.consume_messages(no_ack=True):
            message.pprint()

2、預取

QoS(Quality of service)中,可設置消費者預先接收一定數量的消息。Basic.Qos一般在Basic.Consume之前設置。

示例程序:指定QoS

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        #預取數為10
        channel.prefetch_count(10)
        for message in rabbitpy.Queue(channel, 'test-messages'):
            message.pprint()
            message.ack()

應用程序不需要確認每條消息,可確認所有以前未讀消息。

示例程序:多消息確認

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        channel.prefetch_count(10)
        for message in rabbitpy.Queue(channel, 'test-messages'):
            message.pprint()
            unacknowledged += 1
            if unacknowledged == 10:
                # 確認所有未確認消息
                message.ack(all_previous=True)
                unacknowledged = 0

3、事務

事務允許消費者應用程序提交和回滾批量操作。不適用QoS時,可以獲得輕微的性能提升。


拒絕消息


Basic.Reject

通知rabbitmq無法處理投遞的消息(拒絕一個消息),可指示rabbitMQ丟棄消息或使用requeue重發消息。

示例程序:消息拒絕

import rabbitpy

for message in rabbitpy.consume('amqp://guest:guest@localhost:5672/%2f',
                                'test-messages'):
    message.pprint()
    print('Redelivered: %s' % message.redelivered)
    message.reject(True)

Basic.Nack

同時拒絕多個消息

死信交換器(DLX)

創建隊列時聲明該交換器用於保存被拒絕的消息。隊列x-dead-letter-exchange參數(RPC請求)指定死信交換器。

示例程序:指定死信交換器

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        #死信交換器
        rabbitpy.Exchange(channel, 'rejected-messages').declare()
        queue = rabbitpy.Queue(channel, 'dlx-example',
                               dead_letter_exchange='rejected-messages')
        queue.declare()


控制隊列


臨時隊列

自動刪除隊列
消費者完成連接和檢索消息,所有消費者斷開連接時,隊列將被刪除。

示例程序:自動刪除隊列auto_delete=True

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'ad-example', auto_delete=True)
        queue.declare()

只允許單個消費者
只有單個消費者能夠消費隊列中的消息。消費者斷開連接后,會自動刪除隊列。

示例程序:獨占隊列exclusive

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'exclusive-example',
                               exclusive=True)
        queue.declare()

自動過期隊列
如果一段時間沒有使用該隊列就刪除它,一般用於RPC回復隊列。

示例程序:自動過期隊列

import rabbitpy
import time

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'expiring-queue',
                               arguments={'x-expires': 1000})
        queue.declare()
        messages, consumers = queue.declare(passive=True)
        time.sleep(2)
        try:
            messages, consumers = queue.declare(passive=True)
        except rabbitpy.exceptions.AMQPNotFound:
            print('The queue no longer exists')

永久隊列

隊列持久性
服務器重啟后隊列仍然存在。
示例程序:持久隊列

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'durable-queue',
                               durable=True)
        if queue.declare():
            print('Queue declared')

隊列消息自動過期
同時指定死信交換器和消息TTL,過期消息將成為死信消息。

示例程序:消息TTL

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'expiring-msg-queue',
                               arguments={'x-message-ttl': 1000})
         queue.declare()

最大隊列長度
一旦達到最大值,添加新消息時,刪除隊列前端的消息。聲明隊列時,如果指定死信交換器,前端移除的消息將成為死信。

示例程序:最大長度隊列

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'max-length-queue',
                               arguments={'x-max-length': 1000})
        queue.declare()

隊列設置參數

參數 說明
x-dead-letter-exchange 死信交換器,路由不重發且被拒絕的消息
x-dead-letter-routing-key 死信消息的可選路由鍵
x-expires 隊列在指定的毫秒數后刪除
x-ha-proxy 創建HA隊列
x-ha-nodes HA隊列分布的節點
x-max-length 隊列的最大消息數
x-message-ttl 毫秒為單位的隊列過期時間
x-max-priority 隊列優先級排序


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM