名詞解釋
ConnectionFactory
: 與 RabbitMQ
服務器連接的管理器。
Connection
: 與 RabbitMQ
服務器的連接。
Channel
: 與 Exchange
的連接。
Exchange
: 接收生產者的消息,並根據消息的 RoutingKey
和 Exchange
綁定的 BindingKey
分配消息。
Queue
: 存儲消費者的消息。
RoutingKey
: 指定當前消息被誰接收。
BindingKey
: 指定當前 Exchange
下,什么樣的 RoutingKey
會被下派到當前綁定的 Queue
中。
exchange_declare
方法詳解
def exchange_declare(self, callback=None, exchange=None, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None):
callback
如果 nowait=True
且 Exchange.DeclareOk
時,調用這個回調方法。
exchange
交換器名稱。
exchange_type
交換器類型,常見的如 fanout
direct
topic
。
passive
如果為 true
, 則執行聲明或者檢查交換器是否存在。
durable
設置是否持久化。durable
設置為 true
表示持久化,反之非持久化。持久化可以將交換器存盤,在服務器重啟的時候不會丟失消息。
auto_delete
設置是否自動刪除。auto_delete
設置為 true
則表示自動刪除。自動刪除的前提是至少有一個隊列或者交換器綁定,之后所有與這個交換器綁定的隊列或者交換器都與此解綁。注意不能錯誤地把這個參數理解為:"當與此交換器連接的客戶端都斷開時,RabbitMQ 會自動刪除本交換器"。
internal
設置是否是內置的。如果設置為 true
,則表示是內置的交換器,客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式。
nowait
如果設置為 false
, 則不期望 RabbitMQ
服務器有一個 Exchange.DeclareOk
這樣響應。
arguments
其他一些結構化參數。比如:alternate-exchange
。
exchange_bind
方法詳解
def exchange_bind(self, callback=None, destination=None, source=None, routing_key='', nowait=False, arguments=None):
callback
如果 nowait=True
且 Exchange.DeclareOk
時,調用這個回調方法。
destinaction
一種交換器。
source
消息從 source
交換器轉發到 destination
交換器,從某種程度上看 destination
交換器可以是一種隊列。
routing_key
用來綁定隊列和交換器的路由鍵。
nowait
如果設置為 false
, 則不期望 RabbitMQ
服務器有一個 Exchange.DeclareOk
這樣響應。
arguments
其他一些結構化參數。
queue_declare
方法詳解
def queue_declare(self, queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None):
queue
隊列名稱。
passive
如果為 true
, 則執行聲明或者檢查隊列是否存在。
durable
設置是否持久化。為 true
則設置隊列為持久化。持久化的隊列會存盤,在服務器重啟的時候可以保證不丟失相關的信息。
exclusive
設置是否排他。為 true
則設置隊列為排他的。如果一個隊列被聲明為排他隊列,該隊列對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:
- 排他隊列是基於連接(Connection)可見的,同一個連接的不同信道(Channel)是可以同時訪問同一個連接創建的排他隊列。
- "首次" 是指如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列,這個與普通隊列不同。
- 即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該隊列都會被自動刪除,這種隊列適用於一個客戶端同時發送和讀取消息的應用場景。
auto_delete
設置是否自動刪除。為 true
則設置隊列為自動刪除。自動刪除的前提是:至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。不能把這個參數錯誤的理解為:"當連接到此隊列的所有客戶端斷開時,這個隊列自動刪除",因為生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊列連接時,都不會自動刪除這個隊列。
arguments
設置隊列的其他一些參數,如 x-message-ttl、x-expires、x-max-length
。
queue_bind
方法詳解
def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
queue
隊列名稱。
exchange
交換器的名稱。
routing_key
用來綁定隊列和交換器的路由鍵。
arguments
定義綁定的一些參數。
basic_publish
方法詳解
def basic_publish(self, exchange, routing_key, body, properties=None, mandatory=False, immediate=False):
exchange
交換器的名稱,指明消息需要發送到哪個交換器。如果設置為空字符串,則消息會被發生到 RabbitMQ
默認的交換器中。
routing_key
路由鍵,交換器根據路由鍵將消息存儲到相應的隊列之中。
properties
消息的基本屬性集,其包含 14 個屬性成員,分別有 contentType
、deliveryMode
、proiotity
等。
body
消息體 payload
,真正需要發送的消息。
mandatory
當參數設置為 true
時,交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列,那么 RabbitMQ
會調用 Basic.Return
命令消息返回給生產者。當參數設置為 false
時,出現上述情況,則消息直接丟失。
immediate
當參數設置為 true
時,如果交換器在將消息路由到隊列時發現隊列上並不存在任何消費者,那么這條消息將不會存入隊列中。當與路由鍵匹配的所有隊列都沒有消費者時,該消息會通過 Basic.Return
返回至生產者。
basic_consume
方法詳解
def basic_consume(self, consumer_callback, queue='', no_ack=False, exclusive=False, consumer_tag=None, arguments=None):
consumer_callback
設置消費者的回調函數。用來處理 RabbitMQ
推送過來的消息,比如:DefaultConsumer,使用時需要客戶端重寫其中的方法。
queue
隊列的名稱。
no_ack
設置是否自動確認。建議設置成 false
,即不自動確認。
exclusive
設置是否排他。
consumer_tag
消費者標簽,用來區分多個消費者。
arguments
設置消費者的其他參數。
exchange_type
模式
fanout
模式
- 任何發送到
fanout exchange
的消息都會被轉發到與 exchange
綁定的所有的 queue
上。
- 不需要指定
routing_key
, 即使指定了也是無效的。
- 需要提前將
exchange
與 queue
進行綁定, 一個 exchange
可以綁定到多個 queue
, 一個 queue
也可以同多個 exchange
進行綁定。
- 接收到消息的
exchange
沒有與任何 queue
綁定, 則消息就會被拋棄。
direct
模式
- 發送到
direct exchange
的消息都會被轉發到 routing_key
中指定的 queue
。
- 不需要將
exchange
進行任何綁定操作。當然也可以進行綁定操作,可以將不同的 routing key
與不同的 queue
進行綁定,不同的 queue
與不同 exchange
進行綁定。
- 消息需要傳遞一個
routing_key
。
- 如果消息中不存在
routing_key
中綁定的隊列名,則該消息就會被拋棄。
- 如果一個
exchange
聲明為 direct
,並且綁定中指定了 routing_key
,那么發生消息時。需要同時指明該 exchange
和 routing_key
。
topic
模式
- 這種模式比較復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個
routing_key
,exchange
會將消息轉發到所有關注主題能與 routing_key
模糊匹配的隊列。
- 需要
routing_key
要提前綁定 exchange
與 queue
。
- 在進行綁定的時候,要提供一個該隊列關心的主題,如:"#.log.#" 表示該隊列關心所涉及 log 的消息(一個 routing_key 為 "MQ.log.error" 的消息會被轉發到該隊列)。
- "#" 表示 0 個或若干個關鍵字,"" 表示一個關鍵字。如 "log." 能與 "log.warn.timeout" 匹配,但是 "log.#" 能與上述匹配。
- 如果
exchange
沒有發現能夠與 routing_key
匹配的 Queue
,則會拋棄此消息。