z安裝pika模塊:pip install pika
一個最簡單的生產者/消費者:
生產者,send.py:
import pika class SenderClient(object): def __init__(self, username, passwd, host="127.0.0.1", port=5672, queuename='eeg', exchange='eeg', routing_key='eeg'): self.__host = host self.__port = port self.__username = username self.__passwd = passwd self.__queue = queuename self.__exchange = exchange self.__rout_key = routing_key def connect_mq(self): """ 連接mq :return: """ try: # 創建一個連接對象 if not hasattr(self, 'connection') or self.connection.is_closed: # 添加用戶名和密碼 credentials = pika.PlainCredentials(self.__username, self.__passwd) # 配置連接參數 parameters = pika.ConnectionParameters(host=self.__host, port=self.__port, credentials=credentials) self.connection = pika.BlockingConnection(parameters) except Exception as e: print(e) def channel_mq(self): """ # 創建一個信道 # 聲明隊列和交換機和綁定 :return: """ if not hasattr(self, "connection"): self.connect_mq() if not hasattr(self, 'channel') or self.channel.is_closed: self.channel = self.connection.channel() # 聲明一個隊列,durable參數聲明隊列持久化 self.channel.queue_declare(queue=self.__queue, durable=True) self.channel.exchange_declare(exchange=self.__exchange, durable=True) # 交換機和隊列綁定:生產者發布消息時無需綁定,消費者消費消息時需要綁定 self.channel.queue_bind(exchange=self.__exchange, queue=self.__queue, routing_key=self.__rout_key) def open_data(self, filename): """ # 打開一個數據文件 :return: """ with open(filename, 'r', encoding='utf-8') as f: data = f.read() return data def send_data(self, data:str): """ # 發送數據 :param channel: :return: """ self.channel_mq() # 使用默認交換機投遞消息,返回TRUE或False self.channel.basic_publish(exchange=self.__exchange, routing_key=self.__rout_key, body=data, properties=pika.BasicProperties(delivery_mode=2)) def close_connect(self): """ # 關閉tcp連接 :return: """ self.connection.close() def close_channel(self, channel): """ # 關閉信道 :param channel: :return: """ if not hasattr(self, 'channel'): raise ValueError("the object of SenderClient has not attr of channel.") self.channel.close()
消費者,receive.py
from .send import SenderClient class ReceiverClient(SenderClient): """ 接收rabbitmq消息的消費者 """ def run(self,queuename=None): """ 從mq中接收消息。 :return: """ if not hasattr(self, 'channel') or self.channel.is_closed: self.channel_mq() if not queuename: queuename=self.__queue # 訂閱消息 self.channel.basic_consume(self.callback, queue=queuename, no_ack=False) # 循環等待 self.channel.start_consuming() def callback(self,ch, method, properties, body): """ # 接收消息處理函數 :param ch: :param method: :param properties: :param body: :return: """ print('接收成功!') # 發送確認 ch.basic_ack(delivery_tag=method.delivery_tag)
分析方法:
創建一個連接connection:
# 添加用戶名和密碼 credentials = pika.PlainCredentials(self.__username, self.__passwd) # 配置連接參數 parameters = pika.ConnectionParameters(host=self.__host, credentials=credentials) # 創建一個連接對象 connection = pika.BlockingConnection(parameters)
- pika.PlainCredentials:一個憑據類
# 該類傳遞的參數 def __init__(self, username, # 用戶名 password, # 密碼 erase_on_connect=False): # 是否保存憑據在內存中,如果參數是True,那么該類會在連接完成后刪除
- pika.ConnectionParameters:TCP連接的參數類
def __init__(self, host=_DEFAULT, # 默認的ip127.0.0.1 port=_DEFAULT, # 端口5672 virtual_host=_DEFAULT, # 默認的虛擬主機/ credentials=_DEFAULT, # 默認的憑據 user:guest passwd:guest # 以下的參數含義可以在配置文件中找到,一般不需要在這里配置 channel_max=_DEFAULT, frame_max=_DEFAULT, heartbeat=_DEFAULT, ssl=_DEFAULT, ssl_options=_DEFAULT, connection_attempts=_DEFAULT, retry_delay=_DEFAULT, socket_timeout=_DEFAULT, locale=_DEFAULT, backpressure_detection=_DEFAULT, blocked_connection_timeout=_DEFAULT, client_properties=_DEFAULT, tcp_options=_DEFAULT, **kwargs):
- pika.BlockingConnection:創建一個連接類
def __init__(self, parameters=None, _impl_class=None): # 傳遞一個參數類就可以了,_impl_class只用於測試
-
創建一個channel
# 創建一個信道 channel = connection.channel() def channel(self, channel_number=None): # 指定通道的編號,一般讓rabbitmq自動去管理
-
聲明一個隊列queue
def queue_declare(self, queue='', # 隊列的名字,默認為空,此時將自動創建一個名字, passive=False, # 檢查一下隊列是否存在,如果該參數為True,該方法判斷隊列存在否,不會聲明隊列;存在返回queue的狀態; durable=False, # 隊列持久化參數,默認不持久化 exclusive=False, # 設置獨享隊列,該隊列只被當前的connection使用,如果該tcp關閉了,隊列會被刪除 auto_delete=False,# 當最后一個消費者退訂后自動刪除,默認不開啟 arguments=None) # 一個字典,用於隊列傳遞額外的參數
-
聲明一個交換機exchange
def exchange_declare(self, exchange=None, # 交換機的名字,為空則自動創建一個名字 exchange_type='direct', # 默認交換機類型為direct passive=False, # 檢查交換機是否存在,存在返回狀態信息,不存在返回404錯誤 durable=False, # 設置是否持久化 auto_delete=False, # 最后一個隊列解綁則刪除 internal=False, # 是否設置為值接收從其他交換機發送過來的消息,不接收生產者的消息 arguments=None): # 一個字典,用於傳遞額外的參數
-
聲明一個綁定:
- channel.queue_bind:隊列和交換機綁定
def queue_bind(self, queue, # 隊列的名字 exchange, # 交換機的名字 routing_key=None, # 路由鍵規則,當為None時,默認使用queue的名字作為路由鍵規則 arguments=None): # 一個字典,傳遞額外的參數 # 返回綁定的狀態信息
- channel.exchange_bind:交換機之間的綁定
def exchange_bind(self, destination=None, # 目的交換機的名字 source=None, # 源交換機的名字 routing_key='', # 路由鍵規則,當為None時,默認使用queue的名字作為路由鍵規則 arguments=None): # 一個字典,傳遞額外的參數
- channel.queue_bind:隊列和交換機綁定
-
投遞一條消息
def basic_publish(self, exchange, # 交換機的名字 routing_key, # 路由鍵,吐過交換機是扇形交換機,可以隨便寫 body, # 消息主體 properties=None, # 消息的屬性 mandatory=False, # 是否設置消息托管 immediate=False) # 是否消息實時同步確認,一般和confirm模式配合使用 # properties屬性有一個專門的類來設置 pika.BasicProperties: def __init__(self, content_type=None, content_encoding=None, headers=None, delivery_mode=None, priority=None, correlation_id=None, reply_to=None, expiration=None, message_id=None, timestamp=None, type=None, user_id=None, app_id=None, cluster_id=None): self.content_type = content_type # 消息的類型,如text/html,json等 self.content_encoding = content_encoding # 消息的編碼,如gbk,utf-8等 self.headers = headers # 消息頭,可以和頭交換機約定規則 self.delivery_mode = delivery_mode # 消息持久化,2表示持久化, self.priority = priority # 消息的優先權 self.correlation_id = correlation_id self.reply_to = reply_to self.expiration = expiration # 消息的有效期 self.message_id = message_id # 消息iD,自動管理 self.timestamp = timestamp # 消息的時間戳 self.type = type self.user_id = user_id self.app_id = app_id # 發布應用的ID self.cluster_id = cluster_id
-
訂閱消息
-
方式一:客戶端主動推送消息:channel.basic_consume+channel.start_consuming
注意:推送是異步的,也就是說一次可能推送多條消息,提高性能。
- start_consuming :開始阻塞等待消息
阻塞等待消息是有時間限制的,超過一定時間內如果沒有新的消息推送過來會強制關閉連接,因此如果需要全時段等待的話需要監聽該連接;
def basic_consume(self, # 啟動隊列消費者,告訴服務端開啟一個消費者 consumer_callback, # 消費者回調函數 queue, # 隊列名稱 no_ack=False, # 發送確認,默認開啟消息確認模式,為True是關閉消息確認;如果回調函數中不發送消息確認,消息會一直存在隊列中,等待推送給新連接的消費者 exclusive=False, # 設置獨享消費者,不允許其他消費者訂閱該隊列 consumer_tag=None, # 消費者標簽,如果不指定,系統自動生成 arguments=None): # 字典,額外的參數 consumer_callback:回調函數 def consumer_callback(channel, # 信道 method, # 一個交付的deliver對象,用來通知客戶端消息 properties, # 消息的屬性,就是消息在發送時定義的屬性 body) # 消息的主題,二進制格式 method:spec.Basic.Deliver:交付對象的屬性 def __init__(self, consumer_tag=None, delivery_tag=None, redelivered=False, exchange=None, routing_key=None): self.consumer_tag = consumer_tag # 消費者標簽,用來標記是哪一個消費者 self.delivery_tag = delivery_tag # 交付標簽,用來發送消息確認和標記這是推送給該消費者的第幾條消息 self.redelivered = redelivered # bool類型,若果為False表示這是消息第一次被推送,如果是True,表示這是一條被重復推送的消息 self.exchange = exchange # 該消息是從哪個交換機上來的 self.routing_key = routing_key # 該消息的路由鍵是什么 # 函數中,可以通過method.等獲取相應的屬性 method.consumer_tag/method.delivery_tag/method.redelivered
-
方式二:客戶端主動獲取消息:channel.basic_get:同步獲取消息,性能比方式一低,比較少使用
def basic_get(self, queue=None, # 隊列名稱 no_ack=False): # 是否需要開啟確認模式 return method,properties,body # 需要主動進行消息確認,basic_ack
-
-
取消訂閱
channel.basic_cancel():取消某個消費者訂閱 channel.stop_consuming():取消所有的訂閱
-
訂閱消息確認
def basic_ack(self, delivery_tag=0, # 消息的標記,int類型,一般將回調函數consumer_callback中獲取的交付標記放到這個位置 multiple=False): # Flase表示確認單個消息,為True表示確認多個消息
-
訂閱消息拒絕:
-
basic_nack()
-
basic_reject:另一個方法,只能拒絕單個消息,沒有multiple參數;
def basic_nack(self, delivery_tag=None, # 交付這標記,和basic_ack一樣 multiple=False, # Flase表示拒絕單個消息,為True表示拒絕多個消息 requeue=True) # True表示拒絕了消息后重新放回隊列,False表示丟棄消息
-
-
公平調度
- basic_qos:一般在信道聲明的時候使用,確定該信道的預取數,提高性能 def basic_qos(self, prefetch_size=0, # 設置預取數據包的大小 prefetch_count=0, # 設置預取的數量,如果為0則不預取,消費者處理越快,可以將這個這設置的越高 all_channels=False) # 是否將所有的信道都設置上述參數
-
投遞消息確認機制
-
AMQP協議的事務方式,不推薦使用:tx_select,tx_commit,tx_rollback,方式是十分消耗rabbitmq的性能的,一般不推薦使用
# 開啟一個事務,在提交事務之前必須先執行此方法 channel.tx_select() # 提交一個事務 channel.tx_select() # 捕捉到異常就使用回滾 channel.tx_rollback()
-
import pika if __name__ == "__main__": # 配置連接參數 parameters = pika.ConnectionParameters(host=self.__host) # 創建一個連接對象 connection = pika.BlockingConnection(parameters) # 創建一個信道 channel = connection.channel() # 聲明隊列 channel.queue_declare(queue='test',durable=True) # 開啟事務 channel.tx_select() try: channel.basic_publish(exchange='', routing_key='test', body='hello-world') result = 1/0 channel.tx_commit() except: channel.tx_rollback() # tx_select和tx_commit之間的所有的操作都是事務的一部分
-
confirm 模式:channel.confirm_delivery()
-
rabbbitmq自帶confirm模式,生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一ID)。
import pika if __name__ == "__main__": # 配置連接參數 parameters = pika.ConnectionParameters(host=HOST) # 創建一個連接對象 connection = pika.BlockingConnection(parameters) # 創建一個信道 channel = connection.channel() # 聲明隊列 channel.queue_declare(queue='test', durable=True) # 打開通道的確認模式 channel.confirm_delivery() for i in range(3): result = channel.basic_publish(exchange='', routing_key='test', body='hello-world') if result: break
說明:
-
當確認模式沒有打開時,即使隊列和交換機不存在,投遞消息返回的都是True;
-
當確認模式打開時,投遞失敗會返回False,成功返回True,如果隊列不存在,交換機會叫消息丟掉,但不會通知生產者;如果交換機不存在,會報錯;
- 同一個信道,確認模式和事務模式只能存在一個,不能同時啟用,否則報錯;
-
-
-
- 交換機綁定:channel.exchange_bind()
import pika if __name__ == "__main__": # 添加用戶名和密碼 credentials = pika.PlainCredentials(USERNAME, PASSWD) # 配置連接參數 parameters = pika.ConnectionParameters(host=HOST, credentials=credentials) # 創建一個連接對象 connection = pika.BlockingConnection(parameters) # 創建一個信道 channel = connection.channel() # 聲明隊列 channel.queue_declare(queue='test1', durable=True) channel.queue_declare(queue='test2', durable=True) # 聲明交換機 channel.exchange_declare('myname') channel.exchange_declare('youname') # 交換機綁定 channel.exchange_bind(destination='youname',source='myname',routing_key='ourheart') # 隊列綁定 channel.queue_bind(queue='test1',exchange='myname',routing_key='ourheart') channel.queue_bind(queue='test2', exchange='youname',routing_key='ourheart') channel.basic_publish(exchange='myname', routing_key='ourheart', body='hello-world')
說明
-
交換機相互綁定后,如果他們之間連接的橋routing_key是相同的,向源交換機投遞消息,數據可以到達相同路由鍵的所有的隊列;向目的交換機投遞消息,消息不能到達源交換機;
-
交換機設置了internal了True參數后,該交換機不能再接收到生產者發送的消息,但可以得到源交換機發送的消息;
-
-
其他方法
exchange_delete: 刪除交換機 queue_delete: 刪除隊列 queue_purge: 清除指定隊列的所有的消息 queue_unbind: 隊列和交換機解除綁定 basic.cancel: 清除消費者
參考