遠程調用方法:R(remote) P(procedure) C(call)
為了說明如何使用RPC服務,我們將創建一個簡單的客戶端類。
它將公開一個名為call的方法,它發送一個RPC請求和塊,直到收到響應。
注:可以實現多消費端訪問 , 它會通過 uuid匹配 循環進行指定的處理對應。
rpc的實現
如圖我們可以看出生產端client向消費端server請求處理數據,他會經歷如下幾次來完成交互。
- 1.生產端 生成rpc_queue隊列,這個隊列負責幫消費者 接收數據並把消息發給消費端。
- 2.生產端 生成另外一個隨機隊列,這個隊列是發給消費端,消費這個用這個隊列把處理好的數據發送給生產端。
- 3.生產端 生成一組唯一字符串UUID,發送給消費者,消費者會把這串字符作為驗證在發給生產者。
- 4.當消費端處理完數據,發給生產端,時會把處理數據與UUID一起通過隨機生產的隊列發回給生產端。
- 5.生產端,會使用while循環 不斷檢測是否有數據,並以這種形式來實現阻塞等待數據,來監聽消費端。
- 6.生產端獲取數據調用回調函數,回調函數判斷本機的UUID與消費端發回UID是否匹配,由於消費端,可能有多個,且處理時間不等所以需要判斷,判斷成功賦值數據,while循環就會捕獲到,完成交互。

server 消費端
#_*_coding:utf-8_*_ import pika import time # 鏈接socket connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 生成rpc queue channel.queue_declare(queue='rpc_queue') # 斐波那契數列 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) # 收到消息就調用 # ch 管道內存對象地址 # method 消息發給哪個queue # props 返回給消費的返回參數 # body數據對象 def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) # 調用斐波那契函數 傳入結果 response = fib(n) ch.basic_publish(exchange='', # 生產端隨機生成的queue routing_key=props.reply_to, # 獲取UUID唯一 字符串數值 properties=pika.BasicProperties(correlation_id = \ props.correlation_id), # 消息返回給生產端 body=str(response)) # 確保任務完成 ch.basic_ack(delivery_tag = method.delivery_tag) # rpc_queue收到消息:調用on_request回調函數 # queue='rpc_queue'從rpc內收 channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
Clinet 生產端
import pika import uuid import time # 斐波那契數列 前兩個數相加依次排列 class FibonacciRpcClient(object): def __init__(self): # 鏈接遠程 self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() # 生成隨機queue result = self.channel.queue_declare(exclusive=True) # 隨機取queue名字,發給消費端 self.callback_queue = result.method.queue # self.on_response 回調函數:只要收到消息就調用這個函數。 # 聲明收到消息后就 收queue=self.callback_queue內的消息 self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) # 收到消息就調用 # ch 管道內存對象地址 # method 消息發給哪個queue # body數據對象 def on_response(self, ch, method, props, body): # 判斷本機生成的ID 與 生產端發過來的ID是否相等 if self.corr_id == props.correlation_id: # 將body值 賦值給self.response self.response = body def call(self, n): # 賦值變量,一個循環值 self.response = None # 隨機一次唯一的字符串 self.corr_id = str(uuid.uuid4()) # routing_key='rpc_queue' 發一個消息到rpc_queue內 self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( # 執行命令之后結果返回給self.callaback_queue這個隊列中 reply_to = self.callback_queue, # 生成UUID 發送給消費端 correlation_id = self.corr_id, ), # 發的消息,必須傳入字符串,不能傳數字 body=str(n)) # 沒有數據就循環收 while self.response is None: # 非阻塞版的start_consuming() # 沒有消息不阻塞 self.connection.process_data_events() print("no msg...") time.sleep(0.5) return int(self.response) # 實例化 fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(6) print(" [.] Got %r" % response)