Python操作RabbitMQ


RabbitMQ介紹

RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現的產品,RabbitMQ是一個消息代理,從“生產者”接收消息並傳遞消息至“消費者”,期間可根據規則路由、緩存、持久化消息。“生產者”也即message發送者以下簡稱P,相對應的“消費者”乃message接收者以下簡稱C,message通過queue由P到C,queue存在於RabbitMQ,可存儲盡可能多的message,多個P可向同一queue發送message,多個C可從同一個queue接收message

  • 內部架構:

  • 說明

    • Message (消息):RabbitMQ 轉發的二進制對象,包括Headers(頭)、Properties (屬性)和 Data (數據),其中數據部分不是必要的。Producer(生產者): 消息的生產者,負責產生消息並把消息發到交換機
    • Exhange的應用。

      • Consumer (消費者):使用隊列 Queue 從 Exchange 中獲取消息的應用。
      • Exchange (交換機):負責接收生產者的消息並把它轉到到合適的隊列
      • Queue (隊列):一個存儲Exchange 發來的消息的緩沖,並將消息主動發送給Consumer,或者 Consumer 主動來獲取消息。見 1.4 部分的描述。

      • Binding (綁定):隊列 和 交換機 之間的關系。Exchange 根據消息的屬性和 Binding 的屬性來轉發消息。綁定的一個重要屬性是 binding_key。

      • Connection (連接)和 Channel (通道):生產者和消費者需要和 RabbitMQ 建立 TCP 連接。一些應用需要多個connection,為了節省TCP 連接,可以使用 Channel,它可以被認為是一種輕型的共享 TCP 連接的連接。連接需要用戶認證,並且支持 TLS (SSL)。連接需要顯式關閉。

Python操作RabbitMQ

1.實現簡單消息隊列:

一個Product向queue發送一個message,一個Client從該queue接收message並打印

  • 發消息 product
import pika   

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))     #定義連接池
channel = connection.channel()          
channel.queue_declare(queue='test')    #聲明隊列以向其發送消息消息
channel.basic_publish(exchange='', routing_key='test', body='Hello World!')  #注意當未定義exchange時,routing_key需和queue的值保持一致
print('send success msg to rabbitmq')
connection.close()   #關閉連接

 

  • 收消息,client
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.queue_declare(queue='test')


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)


channel.basic_consume(callback,queue='test',no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 

執行效果:

#product端:
send success msg to rabbitmq

#client端:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'Hello World!'

 

  • 消息確認

當客戶端從隊列中取出消息之后,可能需要一段時間才能處理完成,如果在這個過程中,客戶端出錯了,異常退出了,而數據還沒有處理完成,那么非常不幸,這段數據就丟失了,因為rabbitmq默認會把此消息標記為已完成,然后從隊列中移除,
消息確認是客戶端從rabbitmq中取出消息,並處理完成之后,會發送一個ack告訴rabbitmq,消息處理完成,當rabbitmq收到客戶端的獲取消息請求之后,或標記為處理中,當再次收到ack之后,才會標記為已完成,然后從隊列中刪除。當rabbitmq檢測到客戶端和自己斷開鏈接之后,還沒收到ack,則會重新將消息放回消息隊列,交給下一個客戶端處理,保證消息不丟失,也就是說,RabbitMQ給了客戶端足夠長的時間來做數據處理。

在客戶端使用no_ack來標記是否需要發送ack,默認是False,開啟狀態

product向rabbitmq發送兩條消息:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))     #定義連接池
channel = connection.channel()          #聲明隊列以向其發送消息消息

channel.queue_declare(queue='test')
channel.basic_publish(exchange='', routing_key='test', body='1')
channel.basic_publish(exchange='', routing_key='test', body='2')
channel.basic_publish(exchange='', routing_key='test', body='3')
print('send success msg to rabbitmq')
connection.close()   #關閉連接

 

客戶端接受消息,不發送ack

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.queue_declare(queue='test')


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(5)
    #ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息




channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 

執行結果,發現消息並沒有從隊列中刪除

第一次執行:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'1'
 [x] Received b'2'
 [x] Received b'3'
第二次執行:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'1'
 [x] Received b'2'
 [x] Received b'3'

 

加入ack之后:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.queue_declare(queue='test')


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(5)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息




channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 

運行結果:發現第二次運行隊列中已經沒有消息

第一次:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'1'
 [x] Received b'2'
 [x] Received b'3
 第二次:
  [*] Waiting for messages. To exit press CTRL+C

 

  • 改變消息獲取順序

默認消息隊列里的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。

channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列
默認情況:使用product往隊列中放10個數字

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))     #定義連接池
channel = connection.channel()          #聲明隊列以向其發送消息消息

channel.queue_declare(queue='test')
for i in range(10):
    channel.basic_publish(exchange='', routing_key='test', body=str(i))
    print('send success msg[%s] to rabbitmq' %i)
connection.close()   #關閉連接

 


運行結果;
send success msg[1] to rabbitmq
send success msg[2] to rabbitmq
send success msg[3] to rabbitmq
send success msg[4] to rabbitmq
send success msg[5] to rabbitmq
send success msg[6] to rabbitmq
send success msg[7] to rabbitmq
send success msg[8] to rabbitmq
send success msg[9] to rabbitmq

 

客戶端1收消息:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.queue_declare(queue='test')


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    #time.sleep(5)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息




channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 


運行結果:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'0'
 [x] Received b'2'
 [x] Received b'4'
 [x] Received b'6'
 [x] Received b'8'

 

客戶端2收消息:和client1的區別是加了一個sleep(1)

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.queue_declare(queue='test')


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息
    
channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 



執行結果:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'1'
 [x] Received b'3'
 [x] Received b'5'
 [x] Received b'7'
 [x] Received b'9'

 

在兩個客戶端里加入channel.basic_qos(prefetch_count=1)參數

客戶端1:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.queue_declare(queue='test')


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
   ##time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息

channel.basic_qos(prefetch_count=1)  #添加不按順序分配消息的參數
channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 


執行效果:
[*] Waiting for messages. To exit press CTRL+C
 [x] Received b'0'
 [x] Received b'2'
 [x] Received b'3'
 [x] Received b'4'
 [x] Received b'5'
 [x] Received b'6'
 [x] Received b'7'
 [x] Received b'8'
 [x] Received b'9'

 

客戶端2:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.queue_declare(queue='test')


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息



channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='test',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 



執行結果:
[*] Waiting for messages. To exit press CTRL+C
 [x] Received b'1'

 

發現,加入channel.basic_qos(prefetch_count=1)參數之后,客戶端2由於sleep了1s,所以只拿到了一個消息,其他的消息都被client1拿到了

  • 消息持久化 消息確認機制使得客戶端在崩潰的時候,服務端消息不丟失,但是如果rabbitmq奔潰了呢?該如何保證隊列中的消息不丟失? 此就需要product在往隊列中push消息的時候,告訴rabbitmq,此隊列中的消息需要持久化,用到的參數:durable=True,再次強調,Producer和client都應該去創建這個queue,盡管只有一個地方的創建是真正起作用的:
channel.basic_publish(exchange='',  
                      routing_key="test",  
                      body=message,  
                      properties=pika.BasicProperties(  
                         delivery_mode = 2, # make message persistent  
                      ))  

 

具體代碼:

product端:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))     #定義連接池
channel = connection.channel()          #聲明隊列以向其發送消息消息

channel.queue_declare(queue='test_persistent',durable=True)
for i in range(10):
    channel.basic_publish(exchange='', routing_key='test_persistent', body=str(i),properties=pika.BasicProperties(delivery_mode=2))
    print('send success msg[%s] to rabbitmq' %i)
connection.close()   #關閉連接

 

client端:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.queue_declare(queue='test_persistent',durable=True)


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    #time.sleep(5)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息



channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='test_persistent',no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息
注意:client端也需配置durable=True,否則將報下面錯誤:

pika.exceptions.ChannelClosed: (406, "PRECONDITION_FAILED - parameters for queue 'test_persistent' in vhost '/' not equivalent")

 

配置完之后,發現product往rabbitmq端push消息之后,重啟rabbitmq,消息依然存在

[root@dns ~]# rabbitmqctl list_queues
Listing queues ...
abc 0
abcd    0
hello2  300
test    0
test1   20
test_persistent 10
...done.
[root@dns ~]# /etc/init.d/rabbitmq-server restart
Restarting rabbitmq-server: SUCCESS
rabbitmq-server.
[root@dns ~]# rabbitmqctl list_queues
Listing queues ...
abc 0
abcd    0
hello2  300
test1   20
test_persistent 10
...done.

 

參考文檔:參考文檔:http://www.rabbitmq.com/tutorials/tutorial-two-python.html

2.使用Exchanges:

exchanges主要負責從product那里接受push的消息,根據product定義的規則,投遞到queue中,是product和queue的中間件

  • Exchange 類型

    • direct 關鍵字類型
    • topic 模糊匹配類型
    • fanout 廣播類型
  • 使用fanout實現發布訂閱者模型

發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中
訂閱者:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.exchange_declare(exchange='test123',type='fanout')  #定義一個exchange ,類型為fanout
rest = channel.queue_declare(exclusive=True)   #創建一個隨機隊列,並啟用exchange
queue_name = rest.method.queue          #獲取隊列名
channel.queue_bind(exchange='test123',queue=queue_name)   #將隨機隊列名和exchange進行綁定


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息



channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 

發布者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))     #定義連接池
channel = connection.channel()          #聲明隊列以向其發送消息消息

channel.exchange_declare(exchange='test123',type='fanout')
for i in range(10):
    channel.basic_publish(exchange='test123', routing_key='', body=str(i),properties=pika.BasicProperties(delivery_mode=2))
    print('send success msg[%s] to rabbitmq' %i)
connection.close()   #關閉連接

 

注意:
需先定義訂閱者,啟動訂閱者,否則發布者publish到一個不存在的exchange是被禁止的。如果沒有queue bindings exchange的話,msg是被丟棄的。

  • 使用direct 實現根據關鍵字發布消息

消息發布訂閱者模型是發布者發布一條消息,所有訂閱者都可以收到,現在rabbitmq還支持根據關鍵字發送,在發送消息的時候使用routing_key參數指定關鍵字,rabbitmq的exchange會判斷routing_key的值,然后只將消息轉發至匹配的隊列,注意,此時需要訂閱者先創建隊列

配置參數為exchange的type=direct,然后定義routing_key即可

訂閱者1: 訂閱error,warning,info

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.exchange_declare(exchange='test321',type='direct')  #定義一個exchange ,類型為fanout
rest = channel.queue_declare(exclusive=True)   #創建一個隨機隊列,並啟用exchange
queue_name = rest.method.queue          #獲取隊列名

severities = ['error','warning','info']   #定義三個routing_key

for severity in severities:
    channel.queue_bind(exchange='test321', routing_key=severity,queue=queue_name)


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息


channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 

訂閱者2:訂閱error,warning

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.exchange_declare(exchange='test321',type='direct')  #定義一個exchange ,類型為fanout
rest = channel.queue_declare(exclusive=True)   #創建一個隨機隊列,並啟用exchange
queue_name = rest.method.queue          #獲取隊列名

severities = ['error','warning']   #定義兩個routing_key

for severity in severities:
    channel.queue_bind(exchange='test321', routing_key=severity,queue=queue_name)


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息



channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 

發布者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))     #定義連接池
channel = connection.channel()          #聲明隊列以向其發送消息消息

channel.exchange_declare(exchange='test321',type='direct')
channel.basic_publish(exchange='test321', routing_key='info', body='info msg',properties=pika.BasicProperties(delivery_mode=2))  #發送info msg到 info routing_key
channel.basic_publish(exchange='test321', routing_key='error', body='error msg',properties=pika.BasicProperties(delivery_mode=2)) #發送error msg到 error routing_key

print('send success msg[] to rabbitmq')
connection.close()   #關閉連接**

 

效果:發現訂閱者1和訂閱者2都收到error消息,但是只有訂閱者1收到了info消息

訂閱者1:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'info msg'
 [x] Received b'error msg'
訂閱者2:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'error msg'

 

  • 使用topic實現模糊匹配發布消息

direct實現了根據自定義的routing_key來標示不同的queue,使用topic可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列

# 表示可以匹配 0 個 或 多個 單詞
*  表示只能匹配 一個 單詞

如:
fuzj.test 和fuzj.test.test
fuzj.# 會匹配到 fuzj.test 和fuzj.test.test
fuzj.* 只會匹配到fuzj.test

訂閱者1: 使用#匹配

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.exchange_declare(exchange='test333',type='topic')  #定義一個exchange ,類型為fanout
rest = channel.queue_declare(exclusive=True)   #創建一個隨機隊列,並啟用exchange
queue_name = rest.method.queue          #獲取隊列名

channel.queue_bind(exchange='test333', routing_key='test.#',queue=queue_name)


def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息



channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 

訂閱者2:使用*匹配

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672))
channel = connection.channel()

channel.exchange_declare(exchange='test333',type='topic')  #定義一個exchange ,類型為fanout
rest = channel.queue_declare(exclusive=True)   #創建一個隨機隊列,並啟用exchange
queue_name = rest.method.queue          #獲取隊列名

channel.queue_bind(exchange='test333', routing_key='test.*',queue=queue_name)

def callback(ch, method, properties, body):
    '''回調函數,處理從rabbitmq中取出的消息'''
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #發送ack消息



channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()    #開始監聽 接受消息

 

發布者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))     #定義連接池
channel = connection.channel()          #聲明隊列以向其發送消息消息

channel.exchange_declare(exchange='test333',type='topic')
channel.basic_publish(exchange='test333', routing_key='test.123', body='test.123 msg',properties=pika.BasicProperties(delivery_mode=2))
channel.basic_publish(exchange='test333', routing_key='test.123.321', body=' test.123.321 msg',properties=pika.BasicProperties(delivery_mode=2))

print('send success msg[] to rabbitmq')
connection.close()   #關閉連接

 

輸出效果:

訂閱者1:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'test.123 msg'
 [x] Received b' test.123.321 msg'

訂閱者2:
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'test.123 msg'

 

  • 實現RPC

  • 過程:

    • 客戶端 Client 設置消息的 routing key 為 Service 的隊列 op_q,設置消息的 reply-to 屬性為返回的 response 的目標隊列 reponse_q,設置其 correlation_id 為以隨機UUID,然后將消息發到 exchange。比如channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)

    • Exchange 將消息轉發到 Service 的 op_q

    • Service 收到該消息后進行處理,然后將response 發到 exchange,並設置消息的 routing_key 為原消息的 reply_to 屬性,以及設置其 correlation_id 為原消息的 correlation_id 。

      ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))

    • Exchange 將消息轉發到 reponse_q

    • Client 逐一接受 response_q 中的消息,檢查消息的 correlation_id 是否為等於它發出的消息的correlation_id,是的話表明該消息為它需要的response。

  • 代碼實現:

    • 服務端:
import pika
import subprocess
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1', port=5672, ))       #定義連接池

channel = connection.channel()    #創建通道

channel.queue_declare(queue='rpc_queue')            #創建rpc_queue隊列

def operating(arg):
    p = subprocess.Popen(arg, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)   #執行系統命令
    res = p.stdout.read()       #取出標准輸出
    if not res:                 #判斷是否有執行結果
        responses_msg = p.stderr.read()         #沒有執行結果則取出標准錯誤輸出
    else:
        responses_msg = res
    return responses_msg

def on_request(ch, method, props, body):
    command = str(body,encoding='utf-8')
    print(" [.] start Processing command : %s" % command)
    response_msg = operating(body)          #調用函數執行命令
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = props.correlation_id),body=str(response_msg))
    ch.basic_ack(delivery_tag = method.delivery_tag)


channel.basic_qos(prefetch_count=1)       #消息不平均分配,誰取誰得
channel.basic_consume(on_request, queue='rpc_queue')    #監聽隊列

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

  • 客戶端
import pika
import uuid
import time

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='127.0.0.1',port=5672,))     #定義連接池

        self.channel = self.connection.channel()        #創建通道

        result = self.channel.queue_declare(exclusive=True,auto_delete=True)  #創建客戶端短接受服務端回應消息的隊列,\exclusive=True表示只隊列只允許當前鏈接進行連接,auto_delete=True表示自動刪除
        self.callback_queue = result.method.queue     #獲取隊列名稱

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)            #從隊列中獲取消息

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:     #判斷
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,   #回應消息的隊列
                                         correlation_id = self.corr_id, #correlation id可以理解為請求的唯一標識碼
                                         ),
                                   body=str(n))
        while self.response is None:        #不斷從自己監聽的隊列里取消息,直到取到消息
            self.connection.process_data_events()
        return self.response.decode()

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting server" )
time.sleep(0.1)
while True:
    command = input('>> ')
    response = fibonacci_rpc.call(command)
    print(" [.] Get %r \n" % response)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM