RPC原理及其調用過程


遠程過程調用,簡稱為RPC,是一個計算機通信協議,它允許運行於一台計算機的程序調用另一台計算機的子程序,而無需額外地為這個交互作用編程。

RPC與傳統的HTTP對比

優點:

  1. 傳輸效率高(二進制傳輸)

  2. 發起調用的一方無需知道RPC的具體實現,如同調用本地函數般調用

缺點:

  1. 通用性不如HTTP好(HTTP是標准協議)

總結:RPC適合內部服務間的通信調用;HTTP適合面向用戶與服務間的通信調用

 

RPC調用過程如下:

 

  1. 調用者(客戶端Client)以本地調用的方式發起調用;
  2. Client stub(客戶端存根)收到調用后,負責將被調用的方法名、參數等打包編碼成特定格式的能進行網絡傳輸的消息體;
  3. Client stub將消息體通過網絡發送給服務端;
  4. Server stub(服務端存根)收到通過網絡接收到消息后按照相應格式進行拆包解碼,獲取方法名和參數;
  5. Server stub根據方法名和參數進行本地調用;
  6. 被調用者(Server)本地調用執行后將結果返回給server stub;
  7. Server stub將返回值打包編碼成消息,並通過網絡發送給客戶端;
  8. Client stub收到消息后,進行拆包解碼,返回給Client;
  9. Client得到本次RPC調用的最終結果。

 

對於RPC調用流程的實現,拋開調用方與被調用方,其核心主要是:消息協議傳輸控制的實現 

(1). RPC消息協議:客戶端調用的參數和服務端的返回值這些在網絡上傳輸的數據以何種方式打包編碼和拆包解碼

 RPC的消息協議在設計時主要要考慮,消息轉換及傳輸的效率,為解決消息轉換及傳輸的效率,可以以二進制的方式傳輸消息,使用原始的二進制傳輸可以省去中間轉換的環節並減少傳輸的數據量。

在Python中可以使用struct模塊對二進制進行編碼和解碼:

struct.pact將其它類型轉換為二進制,通常用於消息長度的轉換:

import struct # 將整數2轉換成適用網絡傳輸的無符號的4個字節整數
>>> struct.pack('!I', 2) '\x00\x00\x00\x02'

 struct.unpack將二進制類型轉換為其它類型:

byte_data = '\x00\x00\x00\x02'
# 將2對應的二進制 轉換為十進制整數 返回結果是個元組
>>> struct.unpack('!I',byte_data) (2,)

 

 (2). RPC傳輸控制

 對於消息數據的傳輸,主要有HTTP傳輸和TCP傳輸,鑒於TCP傳輸的可靠性,RPC的傳輸一般使用TCP作為傳輸協議

在TCP傳輸中客戶端與服務端通過socket進行通信,通信流程:

 

RPC使用TCP進行傳輸控制的實現

 1 import socket
 2 
 3 class Channel(object):
 4     """
 5     與客戶端建立網絡連接
 6     """
 7 
 8     def __init__(self, host, port):
 9         self.host = host  # 服務器地址
10         self.port = port  # 服務器端口
11 
12     def get_connection(self):
13         """
14         獲取一個tcp連接
15         """
16         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
17         sock.connect((self.host, self.port))
18         return sock
19 
20 
21 class Server(object):
22     def __init__(self, host, port, handlers):
23         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
25         self.host = host
26         self.port = port
27         self.sock.bind((host, port))
28         self.handlers = handlers
29 
30     def serve(self):
31         """
32         開啟服務器運行,提供RPC服務
33         """
34         # 開啟服務監聽,等待客戶端連接
35         self.sock.listen(128)
36         print("開始監聽")
37         while True:
38             # 接收客戶端的連接請求
39             conn, addr = self.sock.accept()
40             print("建立連接{}".format(str(addr)))
41 
42             # 創建ServerStub對象,完成客戶端具體的RPC調用
43             stub = ServerStub(conn, self.handlers)
44             try:
45                 while True:
46                     stub.process()
47             except EOFError:
48                 # 表示客戶端關閉了連接
49                 print("客戶端關閉連接")
50             # 關閉服務端連接
51             conn.close()

 

通過Socket使得RPC客戶端與服務器進行通信

 1 TCP服務端
 2 sock = socket.socket()  # 創建一個套接字
 3 sock.bind()  # 綁定端口
 4 sock.listen()  # 監聽連接
 5 sock.accept()  # 接受新連接
 6 sock.close()  # 關閉服務器套接字
 7 
 8 
 9 TCP客戶端
10 sock = socket.socket()  # 創建一個套接字
11 sock.connect()  # 連接遠程服務器
12 sock.recv() #
13 sock.send()  #
14 sock.sendall()  # 完全寫
15 sock.close()  # 關閉

 

RPC服務的實現:為了能讓RPC服務器同時處理多個客戶端的請求,提升性能,可以采用多線程、多進程方式實現,下面分別介紹這兩種方式實現的RPC服務

多線程RPC服務:

 1 class ThreadServer(object):
 2     """
 3    多線程RPC服務器
 4     """
 5 
 6     def __init__(self, host, port, handlers):
 7         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 8         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 9         self.host = host
10         self.port = port
11         self.sock.bind((host, port))
12         self.handlers = handlers
13 
14     def serve(self):
15         """
16         開始服務
17         """
18         self.sock.listen(128)
19         print("開始監聽")
20         while True:
21             conn, addr = self.sock.accept()
22             print("建立鏈接{}".format(str(addr)))
23             t = threading.Thread(target=self.handle, args=(conn,))
24             t.start()
25 
26     def handle(self, client):
27         stub = ServerStub(client, self.handlers)
28         try:
29             while True:
30                 stub.process()
31         except EOFError:
32             print("客戶端關閉連接")
33 
34         client.close()

多進程RPC服務:

 1 class MultiProcessServer(object):
 2     """
 3    多進程服務器
 4     """
 5 
 6     def __init__(self, host, port, handlers):
 7         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 8         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 9         self.host = host
10         self.port = port
11         self.sock.bind((host, port))
12         self.handlers = handlers
13 
14     def serve(self):
15         """
16         開始服務
17         """
18         self.sock.listen(128)
19         print("開始監聽")
20         while True:
21             conn, addr = self.sock.accept()
22             print("建立鏈接{}".format(str(addr)))
23             t = Process(target=self.handle, args=(conn,))
24             t.start()
25 
26     def handle(self, client):
27         stub = ServerStub(client, self.handlers)
28         try:
29             while True:
30                 stub.process()
31         except EOFError:
32             print("客戶端關閉連接")
33 
34         client.close()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM