最近在學習項目中的通用技術,其中一個是在項目中會經常使用的基於RabbitMQ實現的RPC。這里一共有三個點要學習,分別是:RPC是什么?RabbitMQ是什么?如何使用RabbitMQ實現RPC。奔着這三個目標,查閱了資料。做筆記記錄。
RPC
rpc的全稱叫:遠程過程調用,可以通俗的理解為通過網絡調用另一台電腦上的函數的業務處理思想。首先,我們先看看本地的函數調用流程是怎樣。
本地調用:
def fun(a,b): sum = a + b return sum if __name__ = __main__ print "i use a function to sum " sum_main = fun(2,3) print sum_main
本地調用當執行到sum=fun(2,3)時,程序會在內存中查找函數指針fun,然后帶着參數進入fun()函數中運算,最后返回給sum_main。如果是遠程調用,則是從一個電腦A上調用另一個電腦B上的函數。
RPC思想的好處是:
1、更符合編程思想。想要實現什么功能直接調用相應的函數,這是編程最直接的思想。
2、減少代碼重復率。A想實現的功能如果B中已經實現了,那么A就直接調用B的函數,避免自己再重復實現。
RPC調用:
rpc多使用http傳輸請求,格式有xml,json等,這里是xml。如下是使用python中自帶的RPC調用框架來實現的一個最簡單的RPC調用。
client.py
from xmlrpclib import ServerProxy #導入xmlrpclib的包 s = ServerProxy("http://172.171.5.205:8080") #定義xmlrpc客戶端 print s.fun_add(2,3) #調用服務器端的函數
server.py
from SimpleXMLRPCServer import SimpleXMLRPCServer def fun_add(a,b): totle = a + b return totle if __name__ == '__main__': s = SimpleXMLRPCServer(('0.0.0.0', 8080)) #開啟xmlrpcserver s.register_function(fun_add) #注冊函數fun_add print "server is online..." s.serve_forever() #開啟循環等待
先啟動服務器端
后啟動客戶端
這樣就完成了一次RPC調用。RPC的調用流程如下圖所示。調用流程是:
- client調用以本地調用方式調用服務;
- client stub接收到調用后負責將方法、參數等組裝成能夠進行網絡傳輸的消息體;
- client stub找到服務地址,並將消息發送到服務端;
- server stub收到消息后進行解碼;
- server stub根據解碼結果調用本地的服務;
- 本地服務執行並將結果返回給server stub;
- server stub將返回結果打包成消息並發送至消費方;
- client stub接收到消息,並進行解碼;
- 服務消費方得到最終結果。
RabbitMQ
RabbitMQ是實現了AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)的軟件。主要功能是
- 解耦服務。使用rabbitmq可以將自個服務解耦,實現模塊化
- 擴展性高。系統中增加一項功能不需要 從頭開始,自需要增加模塊即可
- 解決高並發瓶頸。消息隊列具有緩存消息功能,能夠有效解決高並發請求。
以下摘錄自知乎:
對於初學者,舉一個飯店的例子來解釋這三個分別是什么吧。不是百分百恰當,但是應該足以解釋這三者的區別。 RPC:假設你是一個飯店里的服務員,顧客向你點菜,但是你不會做菜,所以你采集了顧客要點什么之后告訴后廚去做顧客點的菜, 這叫RPC(remote procedure call),因為廚房的廚師相對於服務員而言是另外一個人(在計算機的世界里就是remote的機器上的一個進程)。 廚師做好了的菜就是RPC的返回值。 任務隊列和消息隊列:本質都是隊列,所以就只舉一個任務隊列的例子。假設這個飯店在高峰期顧客很多,而廚師只有很少的幾個, 所以服務員們不得不把單子按下單順序放在廚房的桌子上,供廚師們一個一個做,這一堆單子就是任務隊列(當然,取決於問題的語境, 可能要把放訂單的桌子也算在里面一起構成所謂的任務隊列平台),廚師們每做完一個菜,就從桌子上的訂單里再取出一個單子繼續做菜。
簡單消息隊列:
最簡單的消息隊列,生產者-消費者模式。一端產生消息,發送到隊列,另一端消費者收取消息。
consume_simple.py
1 #coding:UTF-8
2
3 import pika 4 import time 5
6 # 建立實例
7 connection = pika.BlockingConnection(pika.ConnectionParameters( 8 'localhost')) 9 # 聲明管道
10 channel = connection.channel() 11
14 channel.queue_declare(queue='hello') 15
16 def callback(ch, method, properties, body):
17
18 print "ch",ch 19 print "method",method 20 print "properties",properties 21 print "body",body
25 print(" [x] Received %r" % body) 27 # 消費消息
28 channel.basic_consume(
29 callback, # 如果收到消息,就調用callback函數來處理消息
30 queue='hello', # 你要從那個隊列里收消息
33 ) 34
35 print(' [*] Waiting for messages. To exit press CTRL+C') 36 channel.start_consuming() # 開始消費消息
productor_simple.py
1 #coding:UTF-8
2 import pika 3
4 # 建立一個實例
5 connection = pika.BlockingConnection( 6 pika.ConnectionParameters('localhost')
7 ) 8 # 聲明一個管道,在管道里發消息
9 channel = connection.channel() 10 # 在管道里聲明queue
11 channel.queue_declare(queue='hello')
13 channel.basic_publish(exchange='', 14 routing_key='hello', # queue名字
15 body='Hello World!') # 消息內容
16 print(" [x] Sent 'Hello World!'") 17 connection.close() # 隊列關閉
先運行消費者
在運行生產者
觀察消費者獲取的消息
RabbitMQ實現RPC
RPC的要求是等待獲得返回值,而RabbitMQ常出現的場景是異步等待。這就要求RabbitMQ可以立即返回結果。使用了兩種技術:
一、為調用指明id,要求id和結果一起返回,使用id來判斷是哪一個函數的調用返回;
二、指明返回的隊列名,返回結果時指明返回隊列的名字確保會正確返回到調用者。
工作流程:
- 客戶端創建message時指定reply_to隊列名、correlation_id標記調用者。
- 通過隊列,服務端收到消息。調用函數處理,然后返回。
- 返回的隊列是reply_to指定的隊列,並攜帶correlation_id。
- 返回消息到達客戶端,客戶端根據correlation_id判斷是哪一個函數的調用返回。
#coding:UTF-8
import pika import uuid import time class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, # 只要一收到消息就調用on_response no_ack=True, queue=self.callback_queue) # 收這個queue的消息 def on_response(self, ch, method, props, body): # 必須四個參數 # 如果收到的ID和本機生成的相同,則返回的結果就是我想要的指令返回的結果 if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None # 初始self.response為None self.corr_id = str(uuid.uuid4()) # 隨機唯一字符串 self.channel.basic_publish( exchange='', routing_key='rpc_queue', # 發消息到rpc_queue properties=pika.BasicProperties( # 消息持久化 reply_to = self.callback_queue, # 讓服務端命令結果返回到callback_queue correlation_id = self.corr_id, # 把隨機uuid同時發給服務器 ), body=str(n) ) while self.response is None: # 當沒有數據,就一直循環 # 啟動后,on_response函數接到消息,self.response 值就不為空了 self.connection.process_data_events() # 非阻塞版的start_consuming() # print("no msg……") # time.sleep(0.5) # 收到消息就調用on_response return int(self.response) if __name__ == '__main__': fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(7)") response = fibonacci_rpc.call(7) print(" [.] Got %r" % response)
#coding:UTF-8
import pika import time def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish( exchange='', # 把執行結果發回給客戶端 routing_key=props.reply_to, # 客戶端要求返回想用的queue # 返回客戶端發過來的correction_id 為了讓客戶端驗證消息一致性 properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response) ) ch.basic_ack(delivery_tag = method.delivery_tag) # 任務完成,告訴客戶端 if __name__ == '__main__': connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') # 聲明一個rpc_queue , channel.basic_qos(prefetch_count=1) # 在rpc_queue里收消息,收到消息就調用on_request channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()