pika詳解(二) BlockingConnection
BlockingConnection提供了最通用的連接方式
提供兩個類: BlockingConnection 和 BlockingChannel
class BlockingConnection(object):
def __init__(self, parameters=None, _impl_class=None):
...
BlockingConnection是在pika異步的基礎上提供的阻塞方法, 調用的是 AMQP協議的 Basic.Deliver and Basic.Return
在使用basic_consume接收消息, 使用basic_publish發送消息的時候仍然可以實現異步
為防止遞歸調用或者阻塞, blocking連接/channel 在上下文切換中實現 隊列的asynchronously-delivered事件(異步通知事件), 比如在等待BlockingConnection.channel或 BlockingChannel.queue_declare時, 一旦實現嵌套的上下文, 將會同步(synchronously)調用它們, 這涉及到所有的回調函數:
1.lockingConnection.add_on_connection_blocked_callback,
2.BlockingConnection.add_on_connection_unblocked_callback, 3.BlockingChannel.basic_consume 等
避免死鎖, 一直夯住: 但rabbitmq資源不足的時候, 當去連接rabbitmq的時候, rabbitmq會告訴客戶端Connection.Blocked, 然后rabbitmq會暫停處理連接,直到有資源分配進行處理, 這會影響BlockingConnection和BlockingChannel
比如用戶在basic_publish 使用非發布確認機制下, 遇上rabbitmq暫停處理連接,將會一直阻塞住,用戶回調也不會被執行, 可能引起系統宕機, 解決辦法是:
在BlockingConnection初始化時配置blocked_connection_timeout連接參數
類主要的函數方法及說明:
class BlockingConnection(object):
def __init__(self, parameters=None, _impl_class=None):
"""Create a new instance of the Connection object.
:param None | pika.connection.Parameters | sequence parameters:
Connection parameters instance or non-empty sequence of them. If
None, a `pika.connection.Parameters` instance will be created with
default settings. See `pika.AMQPConnectionWorkflow` for more
details about multiple parameter configurations and retries.
:param _impl_class: for tests/debugging only; implementation class;
None=default
:raises RuntimeError:
"""
def add_on_connection_blocked_callback(self, callback):
回調以便在連接被阻塞(從RabbitMQ接收到Connection.Blocked)時收到通知,
在這種狀態下,RabbitMQ暫停處理傳入數據,直到連接被解除阻塞,
因此接收此通知的發布者暫停發布直到連接被解除阻塞, 可以調用
ConnectionParameters.blocked_connection_timeout 添加超時
def add_on_connection_unblocked_callback(self, callback):
回調,以便在連接被解除阻塞時收到通知
def call_later(self, delay, callback):
pass
def add_callback_threadsafe(self, callback):
"""
connection.add_callback_threadsafe(
functools.partial(channel.basic_ack, delivery_tag=...))
"""
回調
def remove_timeout(self, timeout_id):
移除超時
def close(self, reply_code=200, reply_text='Normal shutdown'):
reply_code(int) - 關閉的代碼
reply_text(str) - 關閉的文本原因
def process_data_events(self, time_limit=0):
pass
def sleep(self, duration):
延遲
def channel(self, channel_number=None):
建立channel通道 channel_number 整數 要使用的通道編號,默認為下一個可用通道編號
@property
def is_closed(self):
返回bool值
@property
def is_open(self):
返回bool值
@property
def basic_nack_supported(self):
返回bool值 , 指定服務器是否支持活動連接上的basic.nack
@property
def consumer_cancel_notify_supported(self):
返回bool值 服務器是否支持活動連接上的使用者取消通知
@property
def exchange_exchange_bindings_supported(self):
返回bool值 活動連接是否支持交換以交換綁定
@property
def publisher_confirms_supported(self):
返回bool值 活動連接是否可以使用發布者確認
BlockingChannel 通道
創建示例
import pika
# Create our connection object
connection = pika.BlockingConnection()
# The returned object will be a synchronous channel
channel = connection.channel()
參數:
class BlockingChannel(object):
def __init__(self, channel_impl, connection):
pass
@property
def channel_number(self):
"""Channel number"""
頻道號碼
@property
def connection(self):
"""The channel's BlockingConnection instance"""
@property
def is_closed(self):
是否關閉, 返回bool
@property
def is_open(self):
通道是否開啟, 返回bool
def close(self, reply_code=0, reply_text="Normal shutdown"):
關閉
def flow(self, active):
關閉和打開通道流量控制。 active(bool) - 打開流程(True)或關閉(False)
def add_on_cancel_callback(self, callback):
一個回調函數,該函數將在代理發送Basic.Cancel時調用
callback -:callback(method_frame)其中method_frame類型
是pika.frame.Method類型的方法spec.Basic.Cancel
def add_on_return_callback(self, callback):
回調函數,該函數將在發布的消息被拒絕並由服務器通過Basic.Return返回時調用
callback(callable) - 使用callback(channel,method,properties,body),
其中channel:pika.Channel方法:
pika.spec.Basic.Return屬性:pika.spec.BasicProperties body:bytes
def basic_consume(self,queue, on_message_callback, auto_ack=False,
exclusive=False, consumer_tag=None, arguments=None):
queue(str) - 要使用的隊列
on_message_callback(可調用) -用於將消息分派給用戶的必需函數,定義:
on_message_callback(channel, method,properties,body)
channel:BlockingChannel方法:spec.Basic.Deliver屬性:
spec.BasicProperties body:bytes
auto_ack(bool) - 如果設置為True,將使用自動確認模式。
exclusive(bool) - 不允許隊列中的其他消費者
consumer_tag(str) - 您可以指定自己的消費者標簽; 如果留空,將自動生成消費者標簽
arguments(dict) - 消費者的自定義鍵/值對參數
def basic_cancel(self, consumer_tag):
取消消費者
def start_consuming(self):
處理I / O事件並調度計時器和basic_consume 回調,直到取消所有使用者
def stop_consuming(self, consumer_tag=None):
取消所有使用者
- 1
- 2
def consume(self,queue,auto_ack=False,exclusive=False, arguments=None,
inactivity_timeout=None):
阻止隊列消耗而不是通過回調。此方法是一個生成器,
它將每條消息都生成為方法,屬性和正文的元組。當客戶通過BlockingChannel.cancel()
或代理取消使用者時,活動生成器迭代器終止。
參數:
queue(str) - 要使用的隊列名稱
auto_ack(bool) - 告訴代理不要期待ack / nack響應
exclusive(bool) - 不允許隊列中的其他消費者
arguments(dict) - 消費者的自定義鍵/值對參數
inactivity_timeout(float) - 如果給出一個數字(以秒為單位),將導致該方法在給定的
不活動時間后產生(None,None,None); 這允許用戶在等待消息到達時執行偽常規維護活動。
如果給出 None(默認),則該方法將阻塞,直到下一個事件到達
def get_waiting_message_count(self):
返回可以通過BlockingChannel.consume從當前隊列使用者生成器檢索而不阻塞的消息數
def cancel(self):
def basic_ack(self, delivery_tag=0, multiple=False):
確認一條或多條消息。當客戶端發送時,此方法確認通過Deliver或Get-Ok方法傳遞的一條或多條消息。
當由服務器發送時,此方法確認在確認模式下在通道上使用“發布”方法發布的一條或多條消息。
確認可以是針對單個消息或一組消息,包括特定消息。
參數:
delivery-tag(int) - 服務器分配的傳遞標記
multiple(bool) - 如果設置為True,則將傳遞標記視為“最多並包含”,
以便可以使用單個方法確認多個消息。
如果設置為False,則傳遞標記引用單個郵件。如果多個字段為1,並且傳遞標記為零,
則表示確認所有未完成的消息。
def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
方法允許客戶端拒絕一個或多個傳入消息。它可用於中斷和取消大量傳入消息,
或將無法處理的消息返回到其原始隊列。
參數:
delivery-tag(int) - 服務器分配的傳遞標記
multiple(bool) - 如果設置為True,則將傳遞標記視為“最多並包含”,
以便可以使用單個方法確認多個消息。
如果設置為False,則傳遞標記引用單個郵件。如果多個字段為1,並且傳遞標記為零,
則表示確認所有未完成的消息。
requeue(bool) - 如果requeue為true,服務器將嘗試重新排隊該消息。
如果requeue為false或重新排隊嘗試失敗,則丟棄或刪除消息。
def basic_get(self, queue, auto_ack=False):
從AMQP代理獲取單個消息
參數:
queue(str) - 從中獲取消息的隊列名稱
auto_ack(bool) - 告訴經紀人不要期待回復
def basic_publish(self,exchange, routing_key, body, properties=None, mandatory=False):
參數:
exchange(str) - 要發布的交流
routing_key(str) - 要綁定的路由鍵
body(字節) - 消息體; 如果沒有身體,空字符串
properties(pika.spec.BasicProperties) - 消息屬性
mandatory(bool) - 強制性標志
def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):
指定服務質量
參數:
prefetch_size(int) - 該字段指定預取窗口大小。如果服務器的大小等於或小於可用的
預取大小(並且也屬於其他預取限制),則它將提前發送消息。
可以設置為零,意味着“沒有特定限制”,
盡管其他預取限制可能仍然適用。如果在使用者中設置了no-ack選項,則忽略prefetch-size。
prefetch_count(int) - 根據整個消息指定預取窗口。該字段可以與預取大小字段結合使用;
如果預取窗口(以及通道和連接級別的窗口)都允許,則只會提前發送消息。
如果在使用者中設置了no-ack選項,則忽略prefetch-count。
global_qos(bool) - QoS是否適用於頻道上的所有消費者
def basic_recover(self, requeue=False):
此方法要求服務器重新傳送指定通道上的所有未確認消息。可以重新傳遞零個或多個消息。
此方法替換異步Recover
def basic_reject(self, delivery_tag=None, requeue=True):
拒絕傳入的消息。此方法允許客戶端拒絕郵件。它可用於中斷和取消大量傳入消息,
或將無法處理的消息返回到其原始隊列。
參數:
delivery-tag(int) - 服務器分配的傳遞標記
requeue(bool) - 如果requeue為true,服務器將嘗試重新排隊該消息。
如果requeue為false或重新排隊嘗試失敗,則丟棄或刪除消息。
def confirm_delivery(self):
啟用RabbitMQ專有的確認模式
def exchange_declare(self,exchange,exchange_type='direct',passive=False,
durable=False,auto_delete=False,internal=False,arguments=None):
聲明交換機
exchange(str) - 交換名稱由這些字符的非空序列組成:
字母,數字,連字符,下划線,句點或冒號。
exchange_type(str) - 要使用的交換類型
passive(bool) - 執行聲明或只是檢查它是否存在
durable(bool) - 重啟RabbitMQ
auto_delete(bool) - 當不再綁定隊列時刪除
internal(布爾) - 只能由其他交易所發布
arguments(dict) - 交換的自定義鍵/值對參數
def exchange_delete(self, exchange=None, if_unused=False):
交換機刪除
def exchange_bind(self, destination, source, routing_key='',arguments=None):
交換機綁定
destination(str) - 要綁定的目標交換
source(str) - 要綁定的源交換
routing_key(str) - 要綁定的路由鍵
arguments(dict) - 綁定的自定義鍵/值對參數
def exchange_unbind(self,destination=None,source=None,routing_key='',
arguments=None):
取消綁定
def queue_declare(self,queue, passive=False, durable=False, exclusive=False,
auto_delete=False, arguments=None):
聲明隊列,
queue(str) - 隊列名稱; 如果為空字符串,則代理將創建唯一的隊列名稱
passive(bool) - 只檢查隊列是否存在,如果不存在則引發 ChannelClosed
durable(bool) - 經紀人重新開始
exclusive(bool) - 僅允許當前連接訪問
auto_delete(bool) - 消費者取消或斷開連接后刪除
arguments(dict) - 隊列的自定義鍵/值參數
def queue_delete(self, queue, if_unused=False, if_empty=False):
刪除隊列
def queue_purge(self, queue):
清除指定隊列中的所有消息 queue清除的隊列的名稱
def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
將隊列綁定到指定的交換
參數:
queue(str) - 綁定到交換的隊列
exchange(str) - 要綁定的源交換
routing_key(str) - 要綁定的路由鍵
arguments(dict) - 綁定的自定義鍵/值對參數
def queue_unbind(self,queue,exchange=None,routing_key=None,arguments=None):
從交換中取消綁定隊列
queue(str) - 從交換中取消綁定的隊列
exchange(str) - 要綁定的源交換
routing_key(str) - 解除綁定的路由鍵
arguments(dict) - 綁定的自定義鍵/值對參數
def tx_select(self):
選擇標准交易模式
def tx_commit(self):
事務提交
def tx_rollback(self):
事務回滾