概述
RabbitMQ是一種消息隊列,它接收並轉發消息。
官方例子:可以把RabbitMQ視為一個郵局,將要發布的郵件放在郵箱中,通過郵遞員傳遞給收件人。
但是又有區別二者:RabbitMQ不處理,只做接收,存儲和轉發數據消息的中間介質
實現最簡單的隊列通信
producer_sender.py
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import pika 5 6 credentials = pika.PlainCredentials('admin', 'admin123456') 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.13', credentials=credentials)) 8 # 建立通道 9 channel = connection.channel() 10 # 如果將消息發送到不存在的位置,RabbitMQ會刪除該消息,創建消息隊列叫hello 11 channel.queue_declare(queue='hello') 12 # 發消息了,在RabbitMQ中永遠無法將消息直接發送到隊列中,它始終需要進行交換(不理解這是為毛線?) 13 # 在這里使用空字符串標識的默認交換,准確的將指定消息放入隊列中routing_key來指定 14 channel.basic_publish(exchange='', 15 routing_key='hello', 16 body='Hello World!') 17 18 print('[x]發送hello word') 19 # 在關閉連接之前,需要確保緩沖區已刷新消息是否已經傳到消息隊列中 20 connection.close()
consumer_recv.py
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import pika 5 6 credentials = pika.PlainCredentials('admin', 'admin123456') 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.13', credentials=credentials)) 8 channel = connection.channel() 9 # 在此再次申明一次是因為當我們不確定運行哪個程序,在這種情況下,最好在兩個程序中重復聲明隊列 10 channel.queue_declare('hello') 11 12 13 def callback(ch, method, properties, body): 14 """收消息原理:向隊列定義一個回調函數,無論何時接收消息,都由Pika庫調用""" 15 print('[x] Received %r' % body) 16 print(ch) 17 print(method) 18 print(properties) 19 20 21 # 告訴rabbitmq這個特定的回調函數應該接收來自hello隊列的消息 22 channel.basic_consume(queue='hello', 23 auto_ack=True, 24 on_message_callback=callback) 25 print(' [*] Waiting for messages.') 26 # 這里只要消費者一直存在通道之上,就一直死循環,源碼當中有說明 27 channel.start_consuming()
這里注意首次運行要報錯,報錯內容如下(因為我在這里是遠程連接需要認證證書)
在此時就要看下源碼參數設置
1 def __init__( # pylint: disable=R0913,R0914 2 self, 3 host=_DEFAULT, # 默認'localhost' 4 port=_DEFAULT, # 5672 5 virtual_host=_DEFAULT, # 使用rabbitmq虛擬主機,源碼中還做了一次判斷 6 if virtual_host is not self._DEFAULT: 7 self.virtual_host = virtual_host 8 credentials=_DEFAULT, # auth憑證 9 channel_max=_DEFAULT, # 允許的最大的通道數 10 frame_max=_DEFAULT, # AMQP幀的最大字節大小 數據鏈路層的最小傳輸單位稱為幀 為什么要有幀?就是為了保證數據的可靠傳輸把數據封裝成幀了 11 heartbeat=_DEFAULT, # 默認是None 12 ssl_options=_DEFAULT, # 默認是None 13 connection_attempts=_DEFAULT, #最大重試連接次數默認1次 14 retry_delay=_DEFAULT, # 在幾秒鍾內就要等待,在之后的那一刻就不等待了 默認2.0秒 15 socket_timeout=_DEFAULT, # socket連接超時 默認10s 16 stack_timeout=_DEFAULT, # (TCP/[SSL]/AMQP)協議棧超時 建議要比socket_timeout大 默認15s 17 locale=_DEFAULT, # 默認'en_US' 18 blocked_connection_timeout=_DEFAULT, #默認None 如果不是None,連接保持阻塞(由連接觸發)。阻止代理);如果超時在連接解除阻塞之前過期,連接將被斷開 19 client_properties=_DEFAULT, # 默認None 或dict的客戶端屬性使用覆蓋報告給的默認客戶端屬性中的字段RabbitMQ通過“Connection.StartOk”方法。 20 tcp_options=_DEFAULT, # 默認None 為套接字設置一個或沒有TCP選項的dict 21 **kwargs): 22 """Create a new ConnectionParameters instance. See `Parameters` for 23 default values. 24 25 :param str host: Hostname or IP Address to connect to 26 :param int port: TCP port to connect to 27 :param str virtual_host: RabbitMQ virtual host to use 28 :param pika.credentials.Credentials credentials: auth credentials 29 :param int channel_max: Maximum number of channels to allow 30 :param int frame_max: The maximum byte size for an AMQP frame 31 :param int|None|callable heartbeat: Controls AMQP heartbeat timeout negotiation 32 during connection tuning. An integer value always overrides the value 33 proposed by broker. Use 0 to deactivate heartbeats and None to always 34 accept the broker's proposal. If a callable is given, it will be called 35 with the connection instance and the heartbeat timeout proposed by broker 36 as its arguments. The callback should return a non-negative integer that 37 will be used to override the broker's proposal. 38 :param `pika.SSLOptions`|None ssl_options: None for plaintext or 39 `pika.SSLOptions` instance for SSL/TLS. Defaults to None. 40 :param int connection_attempts: Maximum number of retry attempts 41 :param int|float retry_delay: Time to wait in seconds, before the next 42 :param int|float socket_timeout: Positive socket connect timeout in 43 seconds. 44 :param int|float stack_timeout: Positive full protocol stack 45 (TCP/[SSL]/AMQP) bring-up timeout in seconds. It's recommended to 46 set this value higher than `socket_timeout`. 47 :param str locale: Set the locale value 48 :param int|float|None blocked_connection_timeout: If not None, 49 the value is a non-negative timeout, in seconds, for the 50 connection to remain blocked (triggered by Connection.Blocked from 51 broker); if the timeout expires before connection becomes unblocked, 52 the connection will be torn down, triggering the adapter-specific 53 mechanism for informing client app about the closed connection: 54 passing `ConnectionBlockedTimeout` exception to on_close_callback 55 in asynchronous adapters or raising it in `BlockingConnection`. 56 :param client_properties: None or dict of client properties used to 57 override the fields in the default client properties reported to 58 RabbitMQ via `Connection.StartOk` method. 59 :param tcp_options: None or a dict of TCP options to set for socket 60 """
默認是連接的localhost主機,需要從新增加用戶並設置權限即可
運行結果
1 D:\python\python.exe F:/abc/messagequeue/consumer_recv.py 2 [*] Waiting for messages. 3 [x] Received b'Hello World!' 4 <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x00000171F0AF17B8> params=<ConnectionParameters host=192.168.1.13 port=5672 virtual_host=/ ssl=False>>>> 5 <Basic.Deliver(['consumer_tag=ctag1.1830cf91aad94871b0ed054af1f71e3d', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])> 6 <BasicProperties>