pika詳解(二) BlockingConnection


pika詳解(二) BlockingConnection

BlockingConnection提供了最通用的連接方式

提供兩個類: BlockingConnection 和 BlockingChannel

class BlockingConnection(object):
	def __init__(self, parameters=None, _impl_class=None):
		...
 

 

BlockingConnection是在pika異步的基礎上提供的阻塞方法, 調用的是 AMQP協議的 Basic.Deliver and Basic.Return

在使用basic_consume接收消息, 使用basic_publish發送消息的時候仍然可以實現異步

為防止遞歸調用或者阻塞, blocking連接/channel 在上下文切換中實現 隊列的asynchronously-delivered事件(異步通知事件), 比如在等待BlockingConnection.channel或 BlockingChannel.queue_declare時, 一旦實現嵌套的上下文, 將會同步(synchronously)調用它們, 這涉及到所有的回調函數:

1.lockingConnection.add_on_connection_blocked_callback,

2.BlockingConnection.add_on_connection_unblocked_callback, 3.BlockingChannel.basic_consume 等

避免死鎖, 一直夯住: 但rabbitmq資源不足的時候, 當去連接rabbitmq的時候, rabbitmq會告訴客戶端Connection.Blocked, 然后rabbitmq會暫停處理連接,直到有資源分配進行處理, 這會影響BlockingConnection和BlockingChannel

比如用戶在basic_publish 使用非發布確認機制下, 遇上rabbitmq暫停處理連接,將會一直阻塞住,用戶回調也不會被執行, 可能引起系統宕機, 解決辦法是:

在BlockingConnection初始化時配置blocked_connection_timeout連接參數

類主要的函數方法及說明:

class BlockingConnection(object):
    def __init__(self, parameters=None, _impl_class=None):
        """Create a new instance of the Connection object.
        :param None | pika.connection.Parameters | sequence parameters:
            Connection parameters instance or non-empty sequence of them. If
            None, a `pika.connection.Parameters` instance will be created with
            default settings. See `pika.AMQPConnectionWorkflow` for more
            details about multiple parameter configurations and retries.
        :param _impl_class: for tests/debugging only; implementation class;
            None=default
        :raises RuntimeError:

        """

    def add_on_connection_blocked_callback(self, callback):
        回調以便在連接被阻塞(從RabbitMQ接收到Connection.Blocked)時收到通知,
        在這種狀態下,RabbitMQ暫停處理傳入數據,直到連接被解除阻塞,
        因此接收此通知的發布者暫停發布直到連接被解除阻塞, 可以調用 
        ConnectionParameters.blocked_connection_timeout 添加超時

    def add_on_connection_unblocked_callback(self, callback):
        回調,以便在連接被解除阻塞時收到通知

    def call_later(self, delay, callback):
        pass

    def add_callback_threadsafe(self, callback):
        """
            connection.add_callback_threadsafe(
            functools.partial(channel.basic_ack, delivery_tag=...))
        """
        回調

    def remove_timeout(self, timeout_id):
        移除超時

    def close(self, reply_code=200, reply_text='Normal shutdown'):
    	reply_code(int) - 關閉的代碼
		reply_text(str) - 關閉的文本原因


    def process_data_events(self, time_limit=0):
        pass

    def sleep(self, duration):
        延遲

    def channel(self, channel_number=None):
        建立channel通道   channel_number 整數 要使用的通道編號,默認為下一個可用通道編號

    @property
    def is_closed(self):
        返回bool值

    @property
    def is_open(self):
        返回bool值

    @property
    def basic_nack_supported(self):
        返回bool值 , 指定服務器是否支持活動連接上的basic.nack

    @property
    def consumer_cancel_notify_supported(self):
        返回bool值   服務器是否支持活動連接上的使用者取消通知

    @property
    def exchange_exchange_bindings_supported(self):
        返回bool值  活動連接是否支持交換以交換綁定
        
    @property
    def publisher_confirms_supported(self):
        返回bool值  活動連接是否可以使用發布者確認
 

 

BlockingChannel 通道

創建示例

import pika

# Create our connection object
connection = pika.BlockingConnection()

# The returned object will be a synchronous channel
channel = connection.channel()
 

 

參數:

class BlockingChannel(object):
    def __init__(self, channel_impl, connection):
        pass
 

 

    @property
    def channel_number(self):
        """Channel number"""
        頻道號碼

 

    @property
    def connection(self):
        """The channel's BlockingConnection instance"""
 
    @property
    def is_closed(self):
        是否關閉, 返回bool
 
    @property
    def is_open(self):
        通道是否開啟, 返回bool
 
    def close(self, reply_code=0, reply_text="Normal shutdown"):
        關閉
 
    def flow(self, active):
    關閉和打開通道流量控制。   active(bool) - 打開流程(True)或關閉(False)
 
    def add_on_cancel_callback(self, callback):
    	一個回調函數,該函數將在代理發送Basic.Cancel時調用
  		 callback -:callback(method_frame)其中method_frame類型
   		是pika.frame.Method類型的方法spec.Basic.Cancel
 
    def add_on_return_callback(self, callback):
    		回調函數,該函數將在發布的消息被拒絕並由服務器通過Basic.Return返回時調用
   		 callback(callable) - 使用callback(channel,method,properties,body),
   		 其中channel:pika.Channel方法:
   		 pika.spec.Basic.Return屬性:pika.spec.BasicProperties body:bytes
 

 

    def basic_consume(self,queue, on_message_callback, auto_ack=False, 
    								exclusive=False, consumer_tag=None, arguments=None):
    
    		queue(str) - 要使用的隊列
		    on_message_callback(可調用) -用於將消息分派給用戶的必需函數,定義:		
		                    on_message_callback(channel,	method,properties,body)

			channel:BlockingChannel方法:spec.Basic.Deliver屬性:
							spec.BasicProperties body:bytes
			auto_ack(bool) - 如果設置為True,將使用自動確認模式。
			exclusive(bool) - 不允許隊列中的其他消費者
			consumer_tag(str) - 您可以指定自己的消費者標簽; 如果留空,將自動生成消費者標簽
			arguments(dict) - 消費者的自定義鍵/值對參數
 

    def basic_cancel(self, consumer_tag):
    取消消費者
 

 

    def start_consuming(self):
   			 處理I / O事件並調度計時器和basic_consume 回調,直到取消所有使用者
 

 

    def stop_consuming(self, consumer_tag=None):
   		 取消所有使用者
 
  • 1
  • 2
    def consume(self,queue,auto_ack=False,exclusive=False, arguments=None, 
    						inactivity_timeout=None):
    		阻止隊列消耗而不是通過回調。此方法是一個生成器,
    		它將每條消息都生成為方法,屬性和正文的元組。當客戶通過BlockingChannel.cancel()
    		或代理取消使用者時,活動生成器迭代器終止。
    
	    
		參數:	
			queue(str) - 要使用的隊列名稱
			auto_ack(bool) - 告訴代理不要期待ack / nack響應
			exclusive(bool) - 不允許隊列中的其他消費者
			arguments(dict) - 消費者的自定義鍵/值對參數
			inactivity_timeout(float) - 如果給出一個數字(以秒為單位),將導致該方法在給定的
			不活動時間后產生(None,None,None); 這允許用戶在等待消息到達時執行偽常規維護活動。
			如果給出 None(默認),則該方法將阻塞,直到下一個事件到達
 

 

    def get_waiting_message_count(self):
    返回可以通過BlockingChannel.consume從當前隊列使用者生成器檢索而不阻塞的消息數
 

 

    def cancel(self):
 
    def basic_ack(self, delivery_tag=0, multiple=False):
    
    	確認一條或多條消息。當客戶端發送時,此方法確認通過Deliver或Get-Ok方法傳遞的一條或多條消息。
    	當由服務器發送時,此方法確認在確認模式下在通道上使用“發布”方法發布的一條或多條消息。
    	確認可以是針對單個消息或一組消息,包括特定消息。

		參數:	
			delivery-tag(int) - 服務器分配的傳遞標記
			multiple(bool) - 如果設置為True,則將傳遞標記視為“最多並包含”,
			以便可以使用單個方法確認多個消息。
			如果設置為False,則傳遞標記引用單個郵件。如果多個字段為1,並且傳遞標記為零,
			則表示確認所有未完成的消息。
 

 

    def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
    		方法允許客戶端拒絕一個或多個傳入消息。它可用於中斷和取消大量傳入消息,
    		或將無法處理的消息返回到其原始隊列。

	參數:	
			delivery-tag(int) - 服務器分配的傳遞標記
			multiple(bool) - 如果設置為True,則將傳遞標記視為“最多並包含”,
			以便可以使用單個方法確認多個消息。
			如果設置為False,則傳遞標記引用單個郵件。如果多個字段為1,並且傳遞標記為零,
			則表示確認所有未完成的消息。
			requeue(bool) - 如果requeue為true,服務器將嘗試重新排隊該消息。
			如果requeue為false或重新排隊嘗試失敗,則丟棄或刪除消息。
 

 

    def basic_get(self, queue, auto_ack=False):
    	從AMQP代理獲取單個消息

	參數:	
		queue(str) - 從中​​獲取消息的隊列名稱
		auto_ack(bool) - 告訴經紀人不要期待回復
 

 

    def basic_publish(self,exchange, routing_key, body, properties=None, mandatory=False):
   	 參數:	
			exchange(str) - 要發布的交流
			routing_key(str) - 要綁定的路由鍵
			body(字節) - 消息體; 如果沒有身體,空字符串
			properties(pika.spec.BasicProperties) - 消息屬性
			mandatory(bool) - 強制性標志
 

 

 

    def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):
    		指定服務質量
    參數:	
			prefetch_size(int) - 該字段指定預取窗口大小。如果服務器的大小等於或小於可用的
			預取大小(並且也屬於其他預取限制),則它將提前發送消息。
			可以設置為零,意味着“沒有特定限制”,
			盡管其他預取限制可能仍然適用。如果在使用者中設置了no-ack選項,則忽略prefetch-size。
			prefetch_count(int) - 根據整個消息指定預取窗口。該字段可以與預取大小字段結合使用; 
			如果預取窗口(以及通道和連接級別的窗口)都允許,則只會提前發送消息。
			如果在使用者中設置了no-ack選項,則忽略prefetch-count。
			global_qos(bool) - QoS是否適用於頻道上的所有消費者
 

 

 

    def basic_recover(self, requeue=False):
    此方法要求服務器重新傳送指定通道上的所有未確認消息。可以重新傳遞零個或多個消息。
    此方法替換異步Recover
 

 

    def basic_reject(self, delivery_tag=None, requeue=True):
    	拒絕傳入的消息。此方法允許客戶端拒絕郵件。它可用於中斷和取消大量傳入消息,
    	或將無法處理的消息返回到其原始隊列。

	參數:	
		delivery-tag(int) - 服務器分配的傳遞標記
		requeue(bool) - 如果requeue為true,服務器將嘗試重新排隊該消息。
		如果requeue為false或重新排隊嘗試失敗,則丟棄或刪除消息。
 

 

    def confirm_delivery(self):
    	啟用RabbitMQ專有的確認模式
 

 

    def exchange_declare(self,exchange,exchange_type='direct',passive=False,
    	durable=False,auto_delete=False,internal=False,arguments=None):
    
    聲明交換機
	 	exchange(str) - 交換名稱由這些字符的非空序列組成:
	 						字母,數字,連字符,下划線,句點或冒號。
		exchange_type(str) - 要使用的交換類型
		passive(bool) - 執行聲明或只是檢查它是否存在
		durable(bool) - 重啟RabbitMQ
		auto_delete(bool) - 當不再綁定隊列時刪除
		internal(布爾) - 只能由其他交易所發布
		arguments(dict) - 交換的自定義鍵/值對參數
 

 

    def exchange_delete(self, exchange=None, if_unused=False):
	交換機刪除
 

 

    def exchange_bind(self, destination, source, routing_key='',arguments=None):
	交換機綁定
		
	destination(str) - 要綁定的目標交換
	source(str) - 要綁定的源交換
	routing_key(str) - 要綁定的路由鍵
	arguments(dict) - 綁定的自定義鍵/值對參數
 
	
    def exchange_unbind(self,destination=None,source=None,routing_key='',
    arguments=None):
    	取消綁定
 

 

    def queue_declare(self,queue, passive=False, durable=False, exclusive=False,
    					auto_delete=False, arguments=None):
    聲明隊列,
   		 queue(str) - 隊列名稱; 如果為空字符串,則代理將創建唯一的隊列名稱
		passive(bool) - 只檢查隊列是否存在,如果不存在則引發 ChannelClosed
		durable(bool) - 經紀人重新開始
		exclusive(bool) - 僅允許當前連接訪問
		auto_delete(bool) - 消費者取消或斷開連接后刪除
		arguments(dict) - 隊列的自定義鍵/值參數
 

 

    def queue_delete(self, queue, if_unused=False, if_empty=False):
    刪除隊列
 

 

    def queue_purge(self, queue):
    清除指定隊列中的所有消息   queue清除的隊列的名稱
 

 

    def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
		將隊列綁定到指定的交換
	
	參數:	
		queue(str) - 綁定到交換的隊列
		exchange(str) - 要綁定的源交換
		routing_key(str) - 要綁定的路由鍵
		arguments(dict) - 綁定的自定義鍵/值對參數
 

 

    def queue_unbind(self,queue,exchange=None,routing_key=None,arguments=None):
		從交換中取消綁定隊列
		queue(str) - 從交換中取消綁定的隊列
		exchange(str) - 要綁定的源交換
		routing_key(str) - 解除綁定的路由鍵
		arguments(dict) - 綁定的自定義鍵/值對參數
 

 

    def tx_select(self):
	選擇標准交易模式
    def tx_commit(self):
	事務提交
    def tx_rollback(self):
    事務回滾
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM