rabbitmq之python_pika模塊連接MQ使用(五)


前言

接下來使用python的pika模塊連接rabbitmq。

環境搭建

pip install pika

實例介紹

先從一個最簡單的生產者/消費者說起

# send.py
class SenderClient(object):
    def __init__(self, username, passwd, host="127.0.0.1", port=5672,
                 queuename='eeg', exchange='eeg',routing_key='eeg'):
        self.__host = host
        self.__port = port
        self.__username = username
        self.__passwd = passwd
        self.__queue = queuename
        self.__exchange = exchange
        self.__rout_key = routing_key

    def connect_mq(self):
        """
        連接mq
        :return:
        """
        try:
            # 創建一個連接對象
            if not hasattr(self, 'connection') or self.connection.is_closed:
                # 添加用戶名和密碼
                credentials = pika.PlainCredentials(self.__username, self.__passwd)
                # 配置連接參數
                parameters = pika.ConnectionParameters(host=self.__host, port=self.__port, credentials=credentials)
                self.connection = pika.BlockingConnection(parameters)
        except Exception as e:
            print(e)

    def channel_mq(self):
        """
        # 創建一個信道
        # 聲明隊列和交換機和綁定
        :return:
        """
        self.connect_mq()
        if not hasattr(self, 'channel') or self.channel.is_closed:
            self.channel = self.connection.channel()

        # 聲明一個隊列,durable參數聲明隊列持久化
        self.channel.queue_declare(queue=self.__queue, durable=True)
        self.channel.exchange_declare(exchange=self.__exchange, durable=True)
        # 交換機和隊列綁定
        self.channel.queue_bind(exchange=self.__exchange, queue=self.__queue, routing_key=self.__rout_key)

    def open_data(self, filename):
        """
        # 打開一個數據文件
        :return:
        """
        with open(filename, 'r', encoding='utf-8') as f:
            data = f.read()
            return data

    def send_data(self, data:str):
        """
        # 發送數據
        :param channel:
        :return:
        """
        self.channel_mq()
        # 使用默認交換機投遞消息,返回TRUE或False
        self.channel.basic_publish(exchange=self.__exchange,
                                  routing_key=self.__rout_key,
                                  body=data,
                                  properties=pika.BasicProperties(delivery_mode=2))

    def close_connect(self):
        """
        # 關閉tcp連接
        :return:
        """
        self.connection.close()

    def close_channel(self, channel):
        """
        # 關閉信道
        :param channel:
        :return:
        """
        if not hasattr(self, 'channel'):
            raise ValueError("the object of SenderClient has not attr of channel.")
        self.channel.close()

# receiver.py
class ReceiverClient(SenderClient):
    """
    接收rabbitmq消息的消費者
    """
    def run(self,queuename=None):
        """
        從mq中接收消息。
        :return:
        """
        if not hasattr(self, 'channel') or self.channel.is_closed:
            self.channel_mq()

        if not queuename: queuename=self.__queue

        # 訂閱消息
        self.channel.basic_consume(self.callback,
                                   queue=queuename,
                                   no_ack=False)
        # 循環等待
        self.channel.start_consuming()


    def callback(self,ch, method, properties, body):
        """
         # 接收消息處理函數
        :param ch:
        :param method:
        :param properties:
        :param body:
        :return:
        """
        print('接收成功!')
        # 發送確認
        ch.basic_ack(delivery_tag=method.delivery_tag)

分析方法

創建一個連接connection

# 添加用戶名和密碼
credentials = pika.PlainCredentials(self.__username, self.__passwd)
# 配置連接參數
parameters = pika.ConnectionParameters(host=self.__host, credentials=credentials)
# 創建一個連接對象
connection = pika.BlockingConnection(parameters)
  • pika.PlainCredentials:一個憑據類
# 該類傳遞的參數
def __init__(self, username, # 用戶名
            password,                   # 密碼
            erase_on_connect=False):    # 是否保存憑據在內存中,如果參數是True,那么該類會在連接完成后刪除
  • pika.ConnectionParameters:TCP連接的參數類
def __init__(self,  
            host=_DEFAULT,  # 默認的ip127.0.0.1
            port=_DEFAULT,  # 端口5672
            virtual_host=_DEFAULT, # 默認的虛擬主機/
            credentials=_DEFAULT,  # 默認的憑據  user:guest   passwd:guest
            # 以下的參數含義可以在配置文件中找到,一般不需要在這里配置
            channel_max=_DEFAULT,
            frame_max=_DEFAULT,
            heartbeat=_DEFAULT,
            ssl=_DEFAULT,
            ssl_options=_DEFAULT,
            connection_attempts=_DEFAULT,
            retry_delay=_DEFAULT,
            socket_timeout=_DEFAULT,
            locale=_DEFAULT,
            backpressure_detection=_DEFAULT,
            blocked_connection_timeout=_DEFAULT,
            client_properties=_DEFAULT,
            tcp_options=_DEFAULT,
            **kwargs):
  • pika.BlockingConnection:創建一個連接類
def __init__(self, parameters=None, _impl_class=None):
# 傳遞一個參數類就可以了,_impl_class只用於測試

創建一個channel

# 創建一個信道
channel = connection.channel()

def channel(self, channel_number=None): # 指定通道的編號,一般讓rabbitmq自動去管理

聲明一個隊列queue

def queue_declare(self, queue='',         # 隊列的名字,默認為空,此時將自動創建一個名字,
                        passive=False,    # 檢查一下隊列是否存在,如果該參數為True,該方法判斷隊列存在否,不會聲明隊列;存在返回queue的狀態;
                        durable=False,    # 隊列持久化參數,默認不持久化
                        exclusive=False,  # 設置獨享隊列,該隊列只被當前的connection使用,如果該tcp關閉了,隊列會被刪除
                        auto_delete=False,# 當最后一個消費者退訂后自動刪除,默認不開啟
                        arguments=None)   # 一個字典,用於隊列傳遞額外的參數

聲明一個交換機exchange

  • channel.exchange_declare
def exchange_declare(self, exchange=None,  # 交換機的名字,為空則自動創建一個名字
                         exchange_type='direct',  # 默認交換機類型為direct
                         passive=False,    # 檢查交換機是否存在,存在返回狀態信息,不存在返回404錯誤
                         durable=False,    # 設置是否持久化
                         auto_delete=False, # 最后一個隊列解綁則刪除
                         internal=False,   # 是否設置為值接收從其他交換機發送過來的消息,不接收生產者的消息
                         arguments=None):  # 一個字典,用於傳遞額外的參數

聲明一個綁定

  • channel.queue_bind:隊列和交換機綁定
def queue_bind(self, queue,         # 隊列的名字
                    exchange,       # 交換機的名字
                    routing_key=None,  # 路由鍵規則,當為None時,默認使用queue的名字作為路由鍵規則
                    arguments=None):   # 一個字典,傳遞額外的參數
# 返回綁定的狀態信息
  • channel.exchange_bind:交換機之間的綁定
def exchange_bind(self, destination=None,  # 目的交換機的名字
                        source=None,       # 源交換機的名字
                        routing_key='',    # 路由鍵規則,當為None時,默認使用queue的名字作為路由鍵規則
                        arguments=None):   # 一個字典,傳遞額外的參數

投遞一條消息

  • channel.basic_publish
def basic_publish(self, exchange,      # 交換機的名字
                        routing_key,   # 路由鍵,吐過交換機是扇形交換機,可以隨便寫
                        body,          # 消息主體
                        properties=None, # 消息的屬性
                        mandatory=False, # 是否設置消息托管
                        immediate=False) # 是否消息實時同步確認,一般和confirm模式配合使用

# properties屬性有一個專門的類來設置
pika.BasicProperties:

def __init__(self, content_type=None, content_encoding=None, headers=None, delivery_mode=None, priority=None, correlation_id=None, reply_to=None, expiration=None, message_id=None, timestamp=None, type=None, user_id=None, app_id=None, cluster_id=None):
    self.content_type = content_type           # 消息的類型,如text/html,json等
    self.content_encoding = content_encoding   # 消息的編碼,如gbk,utf-8等
    self.headers = headers                     # 消息頭,可以和頭交換機約定規則
    self.delivery_mode = delivery_mode         # 消息持久化,2表示持久化,
    self.priority = priority             # 消息的優先權
    self.correlation_id = correlation_id 
    self.reply_to = reply_to
    self.expiration = expiration         # 消息的有效期
    self.message_id = message_id         # 消息iD,自動管理
    self.timestamp = timestamp           # 消息的時間戳
    self.type = type
    self.user_id = user_id
    self.app_id = app_id                 # 發布應用的ID
    self.cluster_id = cluster_id

訂閱消息

方式一:客戶端主動推送消息

  • channel.basic_consume+channel.start_consuming
def basic_consume(self,            # 啟動隊列消費者,告訴服務端開啟一個消費者
                consumer_callback,  # 消費者回調函數
                queue,              # 隊列名稱
                no_ack=False,       # 發送確認,默認開啟消息確認模式,為True是關閉消息確認;如果回調函數中不發送消息確認,消息會一直存在隊列中,等待推送給新連接的消費者
                exclusive=False,   # 設置獨享消費者,不允許其他消費者訂閱該隊列
                consumer_tag=None, # 消費者標簽,如果不指定,系統自動生成
                arguments=None):   # 字典,額外的參數

consumer_callback:回調函數
def consumer_callback(channel,   # 信道 
                    method,     # 一個交付的deliver對象,用來通知客戶端消息
                    properties,  # 消息的屬性,就是消息在發送時定義的屬性
                    body)        # 消息的主題,二進制格式

method:spec.Basic.Deliver:交付對象的屬性

def __init__(self, consumer_tag=None, delivery_tag=None, redelivered=False, exchange=None, routing_key=None):
    self.consumer_tag = consumer_tag  # 消費者標簽,用來標記是哪一個消費者
    self.delivery_tag = delivery_tag  # 交付標簽,用來發送消息確認和標記這是推送給該消費者的第幾條消息
    self.redelivered = redelivered    # bool類型,若果為False表示這是消息第一次被推送,如果是True,表示這是一條被重復推送的消息
    self.exchange = exchange          # 該消息是從哪個交換機上來的
    self.routing_key = routing_key    # 該消息的路由鍵是什么

# 函數中,可以通過

method.consumer_tag/method.delivery_tag/method.redelivered等獲取相應的屬性

注意:推送是異步的,也就是說一次可能推送多條消息,提高性能。

  • start_consuming :開始阻塞等待消息

阻塞等待消息是有時間限制的,超過一定時間內如果沒有新的消息推送過來會強制關閉連接,因此如果需要全時段等待的話需要監聽該連接;

方式二:客戶端主動獲取消息

  • channel.basic_get:同步獲取消息,性能比方式一低
def basic_get(self, 
              queue=None,     # 隊列名稱
              no_ack=False):  # 是否需要開啟確認模式

return method,properties,body
# 需要主動進行消息確認,basic_ack

取消訂閱

channel.basic_cancel:取消某個消費者訂閱
channel.stop_consuming:取消所有的訂閱

訂閱消息確認

  • basic_ack
def basic_ack(self, 
                delivery_tag=0,   # 消息的標記,int類型,一般將回調函數consumer_callback中獲取的交付標記放到這個位置
                multiple=False):  # Flase表示確認單個消息,為True表示確認多個消息

訂閱消息拒絕

  • basic_nack
def basic_nack(self, 
                delivery_tag=None,  # 交付這標記,和basic_ack一樣
                multiple=False,     # Flase表示拒絕單個消息,為True表示拒絕多個消息
                requeue=True)       # True表示拒絕了消息后重新放回隊列,False表示丟棄消息

  • basic_reject:另一個方法,只能拒絕單個消息,沒有multiple參數;

公平調度


- basic_qos:一般在信道聲明的時候使用,確定該信道的預取數,提高性能

def basic_qos(self, 
            prefetch_size=0,   # 設置預取數據包的大小
            prefetch_count=0,  # 設置預取的數量,如果為0則不預取,消費者處理越快,可以將這個這設置的越高
            all_channels=False) # 是否將所有的信道都設置上述參數

投遞消息確認機制

使用AMQP協議的事務方式

  • 有三個方法tx_select,tx_commit,tx_rollback
# 開啟一個事務,在提交事務之前必須先執行此方法
channel.tx_select
# 提交一個事務
channel.tx_select
# 捕捉到異常就使用回滾
channel.tx_rollback

import pika

if __name__ == "__main__":

    # 配置連接參數
    parameters = pika.ConnectionParameters(host=self.__host)
    # 創建一個連接對象
    connection = pika.BlockingConnection(parameters)
    # 創建一個信道
    channel = connection.channel()
    # 聲明隊列
    channel.queue_declare(queue='test',durable=True)
    # 開啟事務
    channel.tx_select()
    try:
        channel.basic_publish(exchange='',
                              routing_key='test',
                              body='hello-world')
        result = 1/0
        channel.tx_commit()
    except:
        channel.tx_rollback()
    # tx_select和tx_commit之間的所有的操作都是事務的一部分
  • 以上的方式是十分消耗rabbitmq的性能的,一般不推薦使用;

confirm 模式

rabbbitmq自帶confirm模式,生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一ID)。

  • channel.confirm_delivery
import pika

if __name__ == "__main__":
    # 配置連接參數
    parameters = pika.ConnectionParameters(host=HOST)
    # 創建一個連接對象
    connection = pika.BlockingConnection(parameters)
    # 創建一個信道
    channel = connection.channel()
    # 聲明隊列
    channel.queue_declare(queue='test', durable=True)
    # 打開通道的確認模式
    channel.confirm_delivery()
    for i in range(3):
        result = channel.basic_publish(exchange='',
                                routing_key='test',
                                body='hello-world')
        if result:
            break

說明

  1. 當確認模式沒有打開時,即使隊列和交換機不存在,投遞消息返回的都是True;

  2. 當確認模式打開時,投遞失敗會返回False,成功返回True,如果隊列不存在,交換機會叫消息丟掉,但不會通知生產者;如果交換機不存在,會報錯;

  3. 同一個信道,確認模式和事務模式只能存在一個,不能同時啟用,否則報錯;

交換機相互綁定

  • 方法:channel.exchange_bind
import pika

if __name__ == "__main__":
    # 添加用戶名和密碼
    credentials = pika.PlainCredentials(USERNAME, PASSWD)
    # 配置連接參數
    parameters = pika.ConnectionParameters(host=HOST, credentials=credentials)
    # 創建一個連接對象
    connection = pika.BlockingConnection(parameters)
    # 創建一個信道
    channel = connection.channel()
    # 聲明隊列
    channel.queue_declare(queue='test1', durable=True)
    channel.queue_declare(queue='test2', durable=True)
    # 聲明交換機
    channel.exchange_declare('myname')
    channel.exchange_declare('youname')
    # 交換機綁定
    channel.exchange_bind(destination='youname',source='myname',routing_key='ourheart')
    # 隊列綁定
    channel.queue_bind(queue='test1',exchange='myname',routing_key='ourheart')
    channel.queue_bind(queue='test2', exchange='youname',routing_key='ourheart')
    channel.basic_publish(exchange='myname',
                          routing_key='ourheart',
                          body='hello-world')

說明

  1. 交換機相互綁定后,如果他們之間連接的橋routing_key是相同的,向源交換機投遞消息,數據可以到達相同路由鍵的所有的隊列;向目的交換機投遞消息,消息不能到達源交換機;

  2. 交換機設置了internal了True參數后,該交換機不能再接收到生產者發送的消息,但可以得到源交換機發送的消息;

其他方法

exchange_delete: 刪除交換機

queue_delete: 刪除隊列

queue_purge: 清除指定隊列的所有的消息

queue_unbind: 隊列和交換機解除綁定

basic.cancel: 清除消費者

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM