消息隊列RabbitMQ的python接口使用


消息隊列是一種常用的開發中間件,適用於異步、分布式、解耦合等業務場景中,而RabbitMQ是其中一種常用的消息隊列,今天來總結一下RabbitMQ在python端的使用方法。

1.發送接收基礎

python一般使用pika庫來操作RabbitMQ,需要先用pip安裝。

#1 建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 建立一個連接
channel = connection.channel()               # 建立此連接下的一個頻道
channel.queue_declare(queue='hello')         # 聲明一個隊列

#2 發送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

#3 接收消息
channel.basic_consume(on_message_callback=callback, queue='hello')  # 聲明消息處理回調函數
channel.start_consuming()                                           # 開始消費消息,並進入死循環

def callback(channel, method, properties, body):
    print("Received %r" % (body,))                         # body是消息體,根據具體業務進行解析處理
    time.sleep(5)                                          # 模擬處理消息
    channel.basic_ack(delivery_tag = method.delivery_tag)  # 處理完成后,發送ack進行消息確認,消息在服務端安全刪除

#4 關閉連接
connection.close()

2.完整接口示例

import time
import random
import pika
from pika.exceptions import ChannelClosed, ConnectionClosed

# rabbitmq 配置信息
MQ_CONFIG = {
    "hostname": "127.0.0.1",
    "port": 8080,
    "vhost": "/",
    "username": "guest",
    "password": "guest",
    "exchange": "exchange",
    "queue": "queue",
    "routing_key": "key"
}

# 消息隊列基類
class RabbitMQServer(object):
    def __init__(self):
        self.config = MQ_CONFIG                           # 配置文件加載
        self.host = self.config.get("hostname")           # 主機
        self.port = self.config.get("port")               # 端口
        self.username = self.config.get("username")       # 用戶名
        self.password = self.config.get("password")       # 密碼
        self.vhost = self.config.get("vhost")             # 虛擬主機,VirtualHost之間相互隔離
        self.exchange = self.config.get("exchange")       # 交換機
        self.queue = self.config.get("queue")             # 隊列
        self.routing_key = self.config.get("routing_key") # 交換機和隊列的綁定

        self.connection = None
        self.channel = None

    def reconnect(self, heartbeat=True):
        try:
            # 關閉舊的連接
            if self.connection and not self.connection.is_closed:
                self.connection.close()

            # 構造登錄參數
            credentials = pika.PlainCredentials(self.username, self.password)
            parameters = pika.ConnectionParameters(self.host, self.port, self.vhost, credentials)         
                              
            self.connection = pika.BlockingConnection(parameters)
            self.channel = self.connection.channel()
            # 聲明交換機
            self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct", durable=True)  

            # 消費者實例
            if isinstance(self, RabbitComsumer):
                self.channel.queue_declare(queue=self.queue, exclusive=True, durable=True)
                # 綁定隊列
                self.channel.queue_bind(exchange=self.exchange, queue=self.queue, routing_key=self.routing_key)  
                # 表明最大阻塞未ack的消息數量
                self.channel.basic_qos(prefetch_count=1)                                                         
                self.channel.basic_consume(on_message_callback=self.consumer_callback, queue=self.queue, auto_ack=False)
            # 生產者實例
            elif isinstance(self, RabbitPublisher):
                self.channel.queue_declare(queue=self.queue, exclusive=False, durable=True)
            
        except Exception as e:
            print(e)

def excecute(body):
    pass

# 消費者
class RabbitComsumer(RabbitMQServer):
    def __init__(self):
        super(RabbitComsumer, self).__init__()

    def consumer_callback(self, channel, method, properties, body):
        result = execute(body)  # 模擬處理消息
        if channel.is_open:
            if result:
                channel.basic_ack(delivery_tag=method.delivery_tag)
            else:
                # 處理不成功時,發送no_ack
                channel.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=True) 
        if not channel.is_open:
            print("Callback 接收頻道關閉,無法ack")

    def start_consumer(self):
        while True:
            try:
                self.reconnect()
                self.channel.start_consuming()
            except ConnectionClosed as e:        # 保證連接斷開重連
                self.reconnect()
                time.sleep(2)
            except ChannelClosed as e:           # 保證連接斷開重連
                self.reconnect()
                time.sleep(2)
            except Exception as e:
                self.reconnect()
                time.sleep(2)

# 生產者
class RabbitPublisher(RabbitMQServer):
    def __init__(self):
        super(RabbitPublisher, self).__init__()

    def start_publish(self):
        self.reconnect()
        i = 1
        while True:
            message = {"value": i}
            try:
                self.channel.basic_publish(exchange=self.exchange, routing_key=self.routing_key, body=message)
                i += 1
                time.sleep(2)
            except ConnectionClosed as e:
                self.reconnect()
                time.sleep(2)
            except ChannelClosed as e:
                self.reconnect()
                time.sleep(2)
            except Exception as e:
                self.reconnect()
                time.sleep(2)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM