3.RPC引入
上篇回顧:萬物互聯之~深入篇
Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/
其他專欄最新篇:協程加強之~兼容答疑篇 | 聊聊數據庫~SQL環境篇
3.1.概念
RPC
(Remote Procedure Call
):分布式系統常見的一種通信方法(遠程過程調用),通俗講:可以一台計算機的程序調用另一台計算機的子程序(可以把它看成之前我們說的進程間通信,只不過這一次的進程不在同一台PC上了)
PS:RPC
的設計思想是力圖使遠程調用中的通訊細節對於使用者透明,調用雙方無需關心網絡通訊的具體實現
引用一張網上的圖:
和HTTP
有點相似,你可以這樣理解:
- 老版本的
HTTP/1.0
是短鏈接,而RPC
是長連接進行通信- HTTP協議(header、body),RPC可以采取HTTP協議,也可以自定義二進制格式
- 后來
HTTP/1.1
支持了長連接(Connection:keep-alive
),基本上和RPC
差不多了- 但
keep-alive
一般都限制有最長時間,或者最多處理的請求數,而RPC
是基於長連接的,基本上沒有這個限制
- 但
- 后來谷歌直接基於
HTTP/2.0
建立了gRPC
,它們之間的基本上也就差不多了- 如果硬是要區分就是:
HTTP-普通話
和RPC-方言
的區別了 - RPC高效而小眾,HTTP效率沒RPC高,但更通用
- 如果硬是要區分就是:
- PS:
RPC
和HTTP
調用不用經過中間件,而是端到端的直接數據交互- 網絡交互可以理解為基於
Socket
實現的(RPC
、HTTP
都是Socket
的讀寫操作)
- 網絡交互可以理解為基於
簡單概括一下RPC
的優缺點就是:
- 優點:
- 效率更高(可以自定義二進制格式)
- 發起RPC調用的一方,在編寫代碼時可忽略RPC的具體實現(跟編寫本地函數調用一般)
- 缺點:
- 通用性不如HTTP(方言普及程度肯定不如普通話),如果傳輸協議不是HTTP協議格式,調用雙方就需要專門實現通信庫
PS:HTTP更多是Client
與Server
的通訊;RPC
更多是內部服務器間的通訊
3.2.引入
上面說這么多,可能還沒有來個案例實在,我們看個案例:
本地調用sum()
:
def sum(a, b):
"""return a+b"""
return a + b
def main():
result = sum(1, 2)
print(f"1+2={result}")
if __name__ == "__main__":
main()
輸出:(這個大家都知道)
1+2=3
1.xmlrpc案例
官方文檔:
https://docs.python.org/3/library/xmlrpc.client.html
https://docs.python.org/3/library/xmlrpc.server.html
都說RPC
用起來就像本地調用一樣,那么用起來啥樣呢?看個案例:
服務端:(CentOS7:192.168.36.123:50051
)
from xmlrpc.server import SimpleXMLRPCServer
def sum(a, b):
"""return a+b"""
return a + b
# PS:50051是gRPC默認端口
server = SimpleXMLRPCServer(('', 50051))
# 把函數注冊到RPC服務器中
server.register_function(sum)
print("Server啟動ing,Port:50051")
server.serve_forever()
客戶端:(Win10:192.168.36.144
)
from xmlrpc.client import ServerProxy
stub = ServerProxy("http://192.168.36.123:50051")
result = stub.sum(1, 2)
print(f"1+2={result}")
輸出:(Client
用起來是不是和本地差不多?就是通過代理訪問了下RPCServer
而已)
1+2=3
PS:CentOS
服務器不是你綁定個端口就一定能訪問的,如果不能記讓防火牆開放對應的端口
這個之前在說MariaDB
環境的時候有詳細說:https://www.cnblogs.com/dotnetcrazy/p/9887708.html#_map4
# 添加 --permanent永久生效(沒有此參數重啟后失效)
firewall-cmd --zone=public --add-port=80/tcp --permanent
2.ZeroRPC案例:
zeroRPC用起來和這個差不多,也簡單舉個例子吧:
把服務的某個方法注冊到RPCServer
中,供外部服務調用
import zerorpc
class Test(object):
def say_hi(self, name):
return f"Hi,My Name is{name}"
# 注冊一個Test的實例
server = zerorpc.Server(Test())
server.bind("tcp://0.0.0.0:50051")
server.run()
調用服務端代碼:
import zerorpc
client = zerorpc.Client("tcp://192.168.36.123:50051")
result = client.say_hi("RPC")
print(result)
3.3.簡單版自定義RPC
看了上面的引入案例,是不是感覺RPC
不過如此?NoNoNo,要是真這么簡單也就談不上RPC架構
了,上面兩個是最簡單的RPC服務了,可以這么說:生產環境基本上用不到,只能當案例練習罷了,對Python來說,最常用的RPC就兩個gRPC
and Thrift
PS:國產最出名的是Dubbo
and Tars
,Net最常用的是gRPC
、Thrift
、Surging
1.RPC服務的流程
要自己實現一個RPC Server
那么就得了解整個流程了:
Client
(調用者)以本地調用的方式發起調用- 通過
RPC
服務進行遠程過程調用(RPC的目標就是要把這些步驟都封裝起來,讓使用者感覺不到這個過程)- 客戶端的
RPC Proxy
組件收到調用后,負責將被調用的方法名、參數
等打包編碼成自定義的協議 - 客戶端的
RPC Proxy
組件在打包完成后通過網絡把數據包發送給RPC Server
- 服務端的
RPC Proxy
組件把通過網絡接收到的數據包按照相應格式進行拆包解碼
,獲取方法名和參數 - 服務端的
RPC Proxy
組件根據方法名和參數進行本地調用 RPC Server
(被調用者)本地執行后將結果返回給服務端的RPC Proxy
- 服務端的
RPC Proxy
組件將返回值打包編碼成自定義的協議數據包,並通過網絡發送給客戶端的RPC Proxy
組件 - 客戶端的
RPC Proxy
組件收到數據包后,進行拆包解碼,把數據返回給Client
- 客戶端的
Client
(調用者)得到本次RPC
調用的返回結果
用一張時序圖來描述下整個過程:
PS:RPC Proxy
有時候也叫Stub
(存根):(Client Stub,Server Stub)
為屏蔽客戶調用遠程主機上的對象,必須提供某種方式來模擬本地對象,這種本地對象稱為存根(stub),存根負責接收本地方法調用,並將它們委派給各自的具體實現對象
PRC服務實現的過程中其實就兩核心點:
- 消息協議:客戶端調用的參數和服務端的返回值這些在網絡上傳輸的數據以何種方式打包編碼和拆包解碼
- 經典代表:
Protocol Buffers
- 經典代表:
- 傳輸控制:在網絡中數據的收發傳輸控制具體如何實現(
TCP/UDP/HTTP
)
2.手寫RPC
下面我們就根據上面的流程來手寫一個簡單的RPC:
1.Client調用:
# client.py
from client_stub import ClientStub
def main():
stub = ClientStub(("192.168.36.144", 50051))
result = stub.get("sum", (1, 2))
print(f"1+2={result}")
result = stub.get("sum", (1.1, 2))
print(f"1.1+2={result}")
time_str = stub.get("get_time")
print(time_str)
if __name__ == "__main__":
main()
輸出:
1+2=3
1.1+2=3.1
Wed Jan 16 22
2.Client Stub,客戶端存根:(主要有打包
、解包
、和RPC服務器通信
的方法)
# client_stub.py
import socket
class ClientStub(object):
def __init__(self, address):
"""address ==> (ip,port)"""
self.socket = socket.socket()
self.socket.connect(address)
def convert(self, obj):
"""根據類型轉換成對應的類型編號"""
if isinstance(obj, int):
return 1
if isinstance(obj, float):
return 2
if isinstance(obj, str):
return 3
def pack(self, func, args):
"""打包:把方法和參數拼接成自定義的協議
格式:func:函數名@params:類型-參數,類型2-參數2...
"""
result = f"func:{func}"
if args:
params = ""
# params:類型-參數,類型2-參數2...
for item in args:
params += f"{self.convert(item)}-{item},"
# 去除最后一個,
result += f"@params:{params[:-1]}"
# print(result) # log 輸出
return result.encode("utf-8")
def unpack(self, data):
"""解包:獲取返回結果"""
msg = data.decode("utf-8")
# 格式應該是"data:xxxx"
params = msg.split(":")
if len(params) > 1:
return params[1]
return None
def get(self, func, args=None):
"""1.客戶端的RPC Proxy組件收到調用后,負責將被調用的方法名、參數等打包編碼成自定義的協議"""
data = self.pack(func, args)
# 2.客戶端的RPC Proxy組件在打包完成后通過網絡把數據包發送給RPC Server
self.socket.send(data)
# 等待服務端返回結果
data = self.socket.recv(2048)
if data:
return self.unpack(data)
return None
簡要說明下:(我根據流程在Code里面標注了,看起來應該很輕松)
之前有說到核心其實就是消息協議
and傳輸控制
,我客戶端存根
的消息協議是自定義的格式(后面會說簡化方案):func:函數名@params:類型-參數,類型2-參數2...
,傳輸我是基於TCP進行了簡單的封裝
3.Server端:(實現很簡單)
# server.py
import socket
from server_stub import ServerStub
class RPCServer(object):
def __init__(self, address, mycode):
self.mycode = mycode
# 服務端存根(RPC Proxy)
self.server_stub = ServerStub(mycode)
# TCP Socket
self.socket = socket.socket()
# 端口復用
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 綁定端口
self.socket.bind(address)
def run(self):
self.socket.listen()
while True:
# 等待客戶端連接
client_socket, client_addr = self.socket.accept()
print(f"來自{client_addr}的請求:\n")
# 交給服務端存根(Server Proxy)處理
self.server_stub.handle(client_socket, client_addr)
if __name__ == "__main__":
from server_code import MyCode
server = RPCServer(('', 50051), MyCode())
print("Server啟動ing,Port:50051")
server.run()
為了簡潔,服務端代碼我單獨放在了server_code.py
中:
# 5.RPC Server(被調用者)本地執行后將結果返回給服務端的RPC Proxy
class MyCode(object):
def sum(self, a, b):
return a + b
def get_time(self):
import time
return time.ctime()
4.然后再看看重頭戲Server Stub
:
# server_stub.py
import socket
class ServerStub(object):
def __init__(self, mycode):
self.mycode = mycode
def convert(self, num, obj):
"""根據類型編號轉換類型"""
if num == "1":
obj = int(obj)
if num == "2":
obj = float(obj)
if num == "3":
obj = str(obj)
return obj
def unpack(self, data):
"""3.服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
msg = data.decode("utf-8")
# 格式應該是"格式:func:函數名@params:類型編號-參數,類型編號2-參數2..."
array = msg.split("@")
func = array[0].split(":")[1]
if len(array) > 1:
args = list()
for item in array[1].split(":")[1].split(","):
temps = item.split("-")
# 類型轉換
args.append(self.convert(temps[0], temps[1]))
return (func, tuple(args)) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和參數拼接成自定義的協議"""
# 格式:"data:返回值"
return f"data:{result}".encode("utf-8")
def exec(self, func, args=None):
"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
# 如果沒有這個方法則返回None
func = getattr(self.mycode, func, None)
if args:
return func(*args) # 解包
else:
return func() # 無參函數
def handle(self, client_socket, client_addr):
while True:
# 獲取客戶端發送的數據包
data = client_socket.recv(2048)
if data:
try:
data = self.unpack(data) # 解包
if len(data) == 1:
data = self.exec(data[0]) # 執行無參函數
elif len(data) > 1:
data = self.exec(data[0], data[1]) # 執行帶參函數
else:
data = "RPC Server Error Code:500"
except Exception as ex:
data = "RPC Server Function Error"
print(ex)
# 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並通過網絡發送給客戶端的RPC Proxy組件
data = self.pack(data) # 把函數執行結果按指定協議打包
# 把處理過的數據發送給客戶端
client_socket.send(data)
else:
print(f"客戶端:{client_addr}已斷開\n")
break
再簡要說明一下:里面方法其實主要就是解包
、執行函數
、返回值打包
輸出圖示:
再貼一下上面的時序圖:
課外拓展:
HTTP1.0、HTTP1.1 和 HTTP2.0 的區別
https://www.cnblogs.com/heluan/p/8620312.html
簡述分布式RPC框架
https://blog.csdn.net/jamebing/article/details/79610994
分布式基礎—RPC
http://www.dataguru.cn/article-14244-1.html
4.RPC簡化與提煉
上篇回顧:萬物互聯之~RPC專欄 https://www.cnblogs.com/dunitian/p/10279946.html
上節課解答
之前有網友問,很多開源的RPC中都是使用路由表,這個怎么實現?
其實路由表實現起來也簡單,代碼基本上不變化,就修改一下server_stub.py
的__init__
和exe
兩個方法就可以了:
class ServerStub(object):
def __init__(self, mycode):
self.func_dict = dict()
# 初始化一個方法名和方法的字典({func_name:func})
for item in mycode.__dir__():
if not item.startswith("_"):
self.func_dict[item] = getattr(mycode, item)
def exec(self, func, args=None):
"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
# 如果沒有這個方法則返回None
# func = getattr(self.mycode, func, None)
func = self.func_dict[func]
if args:
return func(*args) # 解包
else:
return func() # 無參函數
4.1.Json序列化
Python比較6的同志對上節課的Code肯定嗤之以鼻,上次自定義協議是同的通用方法,這節課我們先來簡化下代碼:
再貼一下上節課的時序圖:
1.Json知識點
官方文檔:https://docs.python.org/3/library/json.html
# 把字典對象轉換為Json字符串
json_str = json.dumps({"func": func, "args": args})
# 把Json字符串重新變成字典對象
data = json.loads(data)
func, args = data["func"], data["args"]
需要注意的就是類型轉換了(eg:python tuple
==> json array
)
Python | JSON |
---|---|
dict | object |
list, tuple | array |
str | string |
int, float | number |
True | true |
False | false |
None | null |
PS:序列化:json.dumps(obj)
,反序列化:json.loads(json_str)
2.消息協議采用Json格式
在原有基礎上只需要修改下Stub
的pack
和unpack
方法即可
Client_Stub(類型轉換都省掉了)
import json
import socket
class ClientStub(object):
def pack(self, func, args):
"""打包:把方法和參數拼接成自定義的協議
格式:{"func": "sum", "args": [1, 2]}
"""
json_str = json.dumps({"func": func, "args": args})
# print(json_str) # log 輸出
return json_str.encode("utf-8")
def unpack(self, data):
"""解包:獲取返回結果"""
data = data.decode("utf-8")
# 格式應該是"{data:xxxx}"
data = json.loads(data)
# 獲取不到就返回None
return data.get("data", None)
# 其他Code我沒有改變
Server Stub()
import json
import socket
class ServerStub(object):
def unpack(self, data):
"""3.服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
data = data.decode("utf-8")
# 格式應該是"格式:{"func": "sum", "args": [1, 2]}"
data = json.loads(data)
func, args = data["func"], data["args"]
if args:
return (func, tuple(args)) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和參數拼接成自定義的協議"""
# 格式:"data:返回值"
json_str = json.dumps({"data": result})
return json_str.encode("utf-8")
# 其他Code我沒有改變
輸出圖示:
4.2.Buffer序列化
RPC其實更多的是二進制的序列化方式,這邊簡單介紹下
1.pickle知識點
官方文檔:https://docs.python.org/3/library/pickle.html
用法和Json
類似,PS:序列化:pickle.dumps(obj)
,反序列化:pickle.loads(buffer)
2.簡單案例
和Json案例類似,也只是改了pack
和unpack
,我這邊就貼一下完整代碼(防止被吐槽)
1.Client
# 和上一節一樣
from client_stub import ClientStub
def main():
stub = ClientStub(("192.168.36.144", 50051))
result = stub.get("sum", (1, 2))
print(f"1+2={result}")
result = stub.get("sum", (1.1, 2))
print(f"1.1+2={result}")
time_str = stub.get("get_time")
print(time_str)
if __name__ == "__main__":
main()
2.ClientStub
import socket
import pickle
class ClientStub(object):
def __init__(self, address):
"""address ==> (ip,port)"""
self.socket = socket.socket()
self.socket.connect(address)
def pack(self, func, args):
"""打包:把方法和參數拼接成自定義的協議"""
return pickle.dumps((func, args))
def unpack(self, data):
"""解包:獲取返回結果"""
return pickle.loads(data)
def get(self, func, args=None):
"""1.客戶端的RPC Proxy組件收到調用后,負責將被調用的方法名、參數等打包編碼成自定義的協議"""
data = self.pack(func, args)
# 2.客戶端的RPC Proxy組件在打包完成后通過網絡把數據包發送給RPC Server
self.socket.send(data)
# 等待服務端返回結果
data = self.socket.recv(2048)
if data:
return self.unpack(data)
return None
3.Server
# 和上一節一樣
import socket
from server_stub import ServerStub
class RPCServer(object):
def __init__(self, address, mycode):
self.mycode = mycode
# 服務端存根(RPC Proxy)
self.server_stub = ServerStub(mycode)
# TCP Socket
self.socket = socket.socket()
# 端口復用
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 綁定端口
self.socket.bind(address)
def run(self):
self.socket.listen()
while True:
# 等待客戶端連接
client_socket, client_addr = self.socket.accept()
print(f"來自{client_addr}的請求:\n")
try:
# 交給服務端存根(Server Proxy)處理
self.server_stub.handle(client_socket, client_addr)
except Exception as ex:
print(ex)
if __name__ == "__main__":
from server_code import MyCode
server = RPCServer(('', 50051), MyCode())
print("Server啟動ing,Port:50051")
server.run()
4.ServerCode
# 和上一節一樣
# 5.RPC Server(被調用者)本地執行后將結果返回給服務端的RPC Proxy
class MyCode(object):
def sum(self, a, b):
return a + b
def get_time(self):
import time
return time.ctime()
5.ServerStub
import socket
import pickle
class ServerStub(object):
def __init__(self, mycode):
self.mycode = mycode
def unpack(self, data):
"""3.服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
func, args = pickle.loads(data)
if args:
return (func, args) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和參數拼接成自定義的協議"""
return pickle.dumps(result)
def exec(self, func, args=None):
"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
# 如果沒有這個方法則返回None
func = getattr(self.mycode, func)
if args:
return func(*args) # 解包
else:
return func() # 無參函數
def handle(self, client_socket, client_addr):
while True:
# 獲取客戶端發送的數據包
data = client_socket.recv(2048)
if data:
try:
data = self.unpack(data) # 解包
if len(data) == 1:
data = self.exec(data[0]) # 執行無參函數
elif len(data) > 1:
data = self.exec(data[0], data[1]) # 執行帶參函數
else:
data = "RPC Server Error Code:500"
except Exception as ex:
data = "RPC Server Function Error"
print(ex)
# 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並通過網絡發送給客戶端的RPC Proxy組件
data = self.pack(data) # 把函數執行結果按指定協議打包
# 把處理過的數據發送給客戶端
client_socket.send(data)
else:
print(f"客戶端:{client_addr}已斷開\n")
break
輸出圖示:
然后關於RPC高級的內容(會涉及到注冊中心
),咱們后面說架構的時候繼續,網絡這邊就說到這