深入篇¶
上節回顧:5種IO模型 | IO多路復用 and 萬物互聯之~網絡編程加強篇
官方文檔:https://docs.python.org/3/library/internet.html
總結篇:【總結篇】一文搞定網絡編程
1.概念回顧¶
1.1.TCP三次握手¶
畫一張圖來通俗化講講TCP三次握手:
用代碼來說,大概過程就是:
1.2.TCP四次揮手¶
畫圖通俗講下TCP四次揮手:
用代碼來說,大概過程就是:
其實這個也很好的解釋了之前的端口占用問題,如果是服務端先斷開連接,那么服務器就是四次揮手的發送方,
最后一次消息是得不到回復的,端口就會保留一段時間
(服務端的端口固定)也就會出現端口占用的情況。如果是客戶端先斷開,那下次連接會自動換個端口,不影響(客戶端的端口是隨機分配的)
PS:之前我們講端口就是send
一個空消息,很多人不是很清楚,這邊簡單驗證下就懂了:
1.3.HTTP¶
之前其實已經寫了個簡版的Web服務器了,簡單回顧下流程:
- 輸入要訪問的網址,在回車的那一瞬間瀏覽器和服務器建立了
TCP三次握手
- 然后瀏覽器
send
一個http的請求報文,服務器接recv
之后進行相應的處理並返回對應的頁面 - 瀏覽器關閉頁面時(
client close
),進行了TCP四次揮手
然后簡單說下HTTP狀態碼:
- 20x系列:服務器正常響應
- 30x系列:重定向
- 301:代表永久重定向,瀏覽器下次訪問這個頁面就直接去目的url了(不推薦)
- 302:臨時重定向,項目升級之后Url經常變,這個302經常用
- eg:訪問
baidu.com
=302=>www.baidu.com
- eg:訪問
- 304:這個是重定向到本地緩存(之前NodeJS說過就不詳細說了)
- 服務器文件沒更新,瀏覽器就直接訪問本地緩存了
- 40x系列:一般都是客戶端請求有問題
- eg:
404 not found
- eg:
- 50x系列:一般都是服務端出問題了
- eg:
500 Server Error
- eg:
2.動態服務器(WSGI
)¶
2.1.簡化版動態服務器¶
我們先自己定義一個動態服務器:
import re
import socket
class HTTPServer(object):
def __init__(self):
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
while True:
self.client_socket, self.client_address = tcp_server.accept()
self.handle()
def response(self, status, body=None):
print(status)
header = f"HTTP/1.1 {status}\r\n\r\n"
with self.client_socket:
self.client_socket.send(header.encode("utf-8"))
if body:
self.client_socket.send(body)
def __static_handler(self, name):
try:
with open(f"./www{name}", "rb") as fs:
return fs.read()
except Exception as ex:
print(ex)
return None
def __dynamic_handler(self, name):
try:
m = __import__(name)
return m.application().encode("utf-8")
except Exception as ex:
print(ex)
return None
def handle(self):
with self.client_socket:
print(f"[來自{self.client_address}的消息:]\n")
data = self.client_socket.recv(2048)
if data:
header, _ = data.decode("utf-8").split("\r\n", 1)
print(header)
# GET /xxx HTTP/1.1
ret = re.match("^\w+? (/[^ ]*) .+$", header)
if ret:
url = ret.groups(1)[0]
# Python三元表達式(之前好像忘說了)
url = "/index.html" if url == "/" else url
print("請求url:", url)
body = str()
# 動態頁面
if ".py" in url:
# 提取模塊名(把開頭的/和.py排除)
body = self.__dynamic_handler(url[1:-3])
else: # 靜態服務器
body = self.__static_handler(url)
# 根據返回的body內容,返回對應的響應碼
if body:
self.response("200 ok", body)
else:
self.response("404 Not Found")
else: # 匹配不到url(基本上不會發生,不排除惡意修改)
self.response((404, "404 Not Found"))
if __name__ == "__main__":
import sys
# 防止 __import__ 導入模塊的時候找不到,忘了可以查看:
# https://www.cnblogs.com/dotnetcrazy/p/9253087.html#5.自己添加模塊路徑
sys.path.insert(1, "./www/bin")
HTTPServer()
效果:
代碼不難其中有個技術點說下:模塊名為字符串怎么導入
?
# test.py
# 如果模塊名是字符串,需要使用__import__
s = "time"
time = __import__(s)
def application():
return time.ctime() # 返回字符串
if __name__ == "__main__":
time_str = application()
print(type(time_str))
print(time_str)
輸出:
<class 'str'>
Thu Dec 20 22:48:07 2018
2.2.路由版動態服務器¶
和上面基本一樣,多了個路由表(self.router_urls
)而已
import re
import socket
class HttpServer(object):
def __init__(self):
# 路由表
self.router_urls = {"/test": "/test.py", "/user": "/test2.py"}
def run(self):
with socket.socket() as server:
# 端口復用
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("", 8080))
server.listen()
while True:
self.client_socket, self.client_address = server.accept()
print(f"[{self.client_address}已上線]")
self.handler()
def response(self, status, body=None):
with self.client_socket as socket:
header = f"HTTP/1.1 {status}\r\n\r\n"
socket.send(header.encode("utf-8"))
if body:
socket.send(body)
def __static_handler(self, name):
try:
with open(f"./www{name}", "rb") as fs:
return fs.read()
except Exception as ex:
print(ex)
return None
def __dynamic_handler(self, name):
try:
m = __import__(name)
return m.application().encode("utf-8")
except Exception as ex:
print(ex)
return None
def handler(self):
data = self.client_socket.recv(2048)
if data:
header, _ = data.decode("utf-8").split("\r\n", 1)
# GET /xxx HTTP/1.1
ret = re.match("^\w+? (/[^ ]*) .+$", header)
if ret:
url = ret.group(1)
print(url) # print url log
body = None
# 路由有記錄:動態頁面
if url in self.router_urls.keys():
url = self.router_urls[url]
# 切片提取模塊名
body = self.__dynamic_handler(url[1:-3])
else: # 靜態服務器
if url == "/":
url = "/index.html"
body = self.__static_handler(url)
# 沒有這個頁面或者出錯
if body:
self.response("200 ok", body)
else:
self.response("404 Not Found")
else:
# 404
self.response("404 Not Found")
else:
print(f"{self.client_address}已下線")
self.client_socket.close()
if __name__ == "__main__":
import sys
# 臨時添加模塊所在路徑
sys.path.insert(1, "./www/bin")
HttpServer().run()
輸出:
看一眼test2.py
:
# test2.py
def application():
return "My Name Is XiaoMing"
if __name__ == "__main__":
print(application())
2.3.官方接口版¶
官方文檔:https://docs.python.org/3/library/wsgiref.html
其實Python官方提供了一個WSGI:Web Server Gateway Interface
的約定:
它只要求Web開發者實現一個
application
函數,就可以響應HTTP請求
2.3.1演示¶
eg:(只要對應的python文件提供了 application(env,start_response)
方法就行了)
# hello.py
# env 是一個字典類型
def application(env, start_response):
# 設置動態頁面的響應頭(回頭服務器會再加上自己的響應頭)
# 列表里面的 item 是 tuple
start_response("200 OK", [("Content-Type", "text/html")])
# 返回一個列表
return ["<h1>This is Test!</h1>".encode("utf-8")]
先使用官方的簡單服務器看看:
from wsgiref.simple_server import make_server
# 導入我們自己編寫的application函數:
from hello import application
# 創建一個服務器,端口是8080,處理函數是application:
httpd = make_server('', 8080, application)
print('Serving HTTP on port 8080...')
# 開始監聽HTTP請求:
httpd.serve_forever()
運行后效果:127.0.0.1:8080
This is Test!
如果把hello.py
改成下面代碼(服務端不變),那么就可以獲取一些請求信息了:
def application(env, start_response):
print(env["PATH_INFO"])
start_response("200 OK", [("Content-Type", "text/html")])
return [f'<h1>Hello, {env["PATH_INFO"][1:] or "web"}!</h1>'.encode("utf-8")]
輸出:
2.3.2說明¶
上面的application()
函數就是符合WSGI
標准的一個HTTP處理函數
,它接收兩個參數:
environ
:一個包含所有HTTP請求信息的dict
對象;start_response
:一個發送HTTP響應的函數(調用服務器定義的方法)Header
只能發送一次 ==> 只能調用一次start_response()
函數
有了WSGI
,我們關心的就是如何從env
這個dict
對象拿到HTTP請求信息
,然后構造HTML
,通過start_response()
發送Header
,最后返回Body內容
Python內置了一個WSGI
服務器,這個模塊叫wsgiref
,它是用純Python
編寫的WSGI
服務器的參考實現(完全符合WSGI
標准,但是不考慮任何運行效率,僅供開發和測試使用)
PS:這樣的好處就是,只要符合WSGI
規范的服務器,我們都可以直接使用了
其實通過源碼就可以知道這個WSGIServer
到底是何方神聖了:
class WSGIServer(HTTPServer):
pass
# HTTPServer其實就是基於TCPServer
class HTTPServer(socketserver.TCPServer):
pass
# 這個就是我們開頭說的Python封裝的簡單WebServer了
class TCPServer(BaseServer):
pass
如果還是記不得可以回顧下上次說的內容,提示:
__all__ = ["BaseServer", "TCPServer", "UDPServer",
"ThreadingUDPServer", "ThreadingTCPServer",
"BaseRequestHandler", "StreamRequestHandler",
"DatagramRequestHandler", "ThreadingMixIn"]
如果你想要在這個基礎上進行處理,可以和上面說的一樣,定義一個繼承class WSGIRequestHandler(BaseHTTPRequestHandler)
的類,然后再處理
2.3.3.自定義¶
在本小節結束前我們模仿一下示例,定義一個符合WSGI
規范的簡單服務器:
import re
import socket
from index import WebFrame
class WSGIServer(object):
def __init__(self):
# 請求頭
self.env = dict()
# 存放處理后的響應頭
self.response_headers = str()
def run(self):
with socket.socket() as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("", 8080))
server.listen()
while True:
self.client_socket, self.client_address = server.accept()
self.handler()
# 轉換瀏覽器請求頭格式
def request_headers_handler(self, headers):
# 過濾一下空字符串(不能過濾空列表)
headers = list(filter(None, headers.split("\r\n")))
# 提取 Method 和 Url
ret = re.match("^([\w]+?) (/[^ ]*?) .+$", headers[0])
if ret:
self.env["method"] = ret.group(1)
url = ret.group(2)
print(url)
self.env["path"] = "/index.html" if url == "/" else url
else:
return None
# [['Host', ' localhost:8080'], ['Connection', ' keep-alive']...]
array = map(lambda item: item.split(":", 1), headers[1:])
for item in array:
self.env[item[0].lower()] = item[1]
# print(self.env)
return "ok"
# 響應客戶端(吐槽一下,request和response的headers為毛格式不一樣,這設計真不合理!)
def start_response(self, status, header_list=[]):
# 響應頭
self.response_headers = f"HTTP/1.1 {status}\r\n"
for item in header_list:
self.response_headers += f"{item[0]}:{item[1]}\r\n"
# print(self.response_headers)
# 響應瀏覽器
def response(self, body):
with self.client_socket as client:
# 省略一系列服務器響應的headers
self.response_headers += "server:WSGIServer\r\n\r\n"
client.send(self.response_headers.encode("utf-8"))
if body:
client.send(body)
def handler(self):
with self.client_socket as client:
data = client.recv(2048)
if data:
# 瀏覽器請求頭
headers = data.decode("utf-8")
if self.request_headers_handler(headers):
# 模仿php所有請求都一個文件處理
body = WebFrame().application(self.env,
self.start_response)
# 響應瀏覽器
self.response(body)
else:
self.start_response("404 Not Found")
else:
client.close()
if __name__ == "__main__":
WSGIServer().run()
自己定義的框架:
class WebFrame(object):
def __init__(self):
# 路由表
self.router_urls = {"/time": "get_time", "/user": "get_name"}
def get_time(self):
import time
return time.ctime().encode("utf-8")
def get_name(self):
return "<h2>My Name Is XiaoMing</h2>".encode("utf-8")
def application(self, env, start_response):
body = b""
url = env["path"]
# 請求的頁面都映射到路由對應的方法中
if url in self.router_urls.keys():
func = self.router_urls[url]
body = getattr(self, func)()
else:
# 否則就請求對應的靜態資源
try:
with open(f"./www{url}", "rb") as fs:
body = fs.read()
except Exception as ex:
start_response("404 Not Found")
print(ex)
return b"404 Not Found" # 出錯就直接返回了
# 返回對應的頁面響應頭
start_response("200 ok", [("Content-Type", "text/html"),
("Scripts", "Python")])
return body
輸出:
知識擴展:
從wsgiref模塊導入
https://docs.python.org/3/library/wsgiref.html
Python服務器網關接口
https://www.python.org/dev/peps/pep-3333/
Python原始套接字和流量嗅探
https://blog.csdn.net/cj1112/article/details/51303021
https://blog.csdn.net/peng314899581/article/details/78082244
【源碼閱讀】輕量級Web框架:bottle
https://github.com/bottlepy/bottle
3.RPC引入¶
上篇回顧:萬物互聯之~深入篇
其他專欄最新篇:協程加強之~兼容答疑篇 | 聊聊數據庫~SQL環境篇
Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/
3.1.概念¶
RPC
(Remote Procedure Call
):分布式系統常見的一種通信方法(遠程過程調用),通俗講:可以一台計算機的程序調用另一台計算機的子程序(可以把它看成之前我們說的進程間通信,只不過這一次的進程不在同一台PC上了)
PS:RPC
的設計思想是力圖使遠程調用中的通訊細節對於使用者透明,調用雙方無需關心網絡通訊的具體實現
引用一張網上的圖:
和HTTP
有點相似,你可以這樣理解:
- 老版本的
HTTP/1.0
是短鏈接,而RPC
是長連接進行通信- HTTP協議(header、body),RPC可以采取HTTP協議,也可以自定義二進制格式
- 后來
HTTP/1.1
支持了長連接(Connection:keep-alive
),基本上和RPC
差不多了- 但
keep-alive
一般都限制有最長時間,或者最多處理的請求數,而RPC
是基於長連接的,基本上沒有這個限制
- 但
- 后來谷歌直接基於
HTTP/2.0
建立了gRPC
,它們之間的基本上也就差不多了- 如果硬是要區分就是:
HTTP-普通話
和RPC-方言
的區別了 - RPC高效而小眾,HTTP效率沒RPC高,但更通用
- 如果硬是要區分就是:
- PS:
RPC
和HTTP
調用不用經過中間件,而是端到端的直接數據交互- 網絡交互可以理解為基於
Socket
實現的(RPC
、HTTP
都是Socket
的讀寫操作)
- 網絡交互可以理解為基於
簡單概括一下RPC
的優缺點就是:
- 優點:
- 效率更高(可以自定義二進制格式)
- 發起RPC調用的一方,在編寫代碼時可忽略RPC的具體實現(跟編寫本地函數調用一般)
- 缺點:
- 通用性不如HTTP(方言普及程度肯定不如普通話),如果傳輸協議不是HTTP協議格式,調用雙方就需要專門實現通信庫
PS:HTTP更多是Client
與Server
的通訊;RPC
更多是內部服務器間的通訊
3.2.引入¶
上面說這么多,可能還沒有來個案例實在,我們看個案例:
本地調用sum()
:
def sum(a, b):
"""return a+b"""
return a + b
def main():
result = sum(1, 2)
print(f"1+2={result}")
if __name__ == "__main__":
main()
輸出:(這個大家都知道)
1+2=3
1.xmlrpc案例¶
官方文檔:
https://docs.python.org/3/library/xmlrpc.client.html
https://docs.python.org/3/library/xmlrpc.server.html
都說RPC
用起來就像本地調用一樣,那么用起來啥樣呢?看個案例:
服務端:(CentOS7:192.168.36.123:50051
)
from xmlrpc.server import SimpleXMLRPCServer
def sum(a, b):
"""return a+b"""
return a + b
# PS:50051是gRPC默認端口
server = SimpleXMLRPCServer(('', 50051))
# 把函數注冊到RPC服務器中
server.register_function(sum)
print("Server啟動ing,Port:50051")
server.serve_forever()
客戶端:(Win10:192.168.36.144
)
from xmlrpc.client import ServerProxy
stub = ServerProxy("http://192.168.36.123:50051")
result = stub.sum(1, 2)
print(f"1+2={result}")
輸出:(Client
用起來是不是和本地差不多?就是通過代理訪問了下RPCServer
而已)
1+2=3
PS:CentOS
服務器不是你綁定個端口就一定能訪問的,如果不能記讓防火牆開放對應的端口
這個之前在說MariaDB
環境的時候有詳細說:https://www.cnblogs.com/dotnetcrazy/p/9887708.html#_map4
# 添加 --permanent永久生效(沒有此參數重啟后失效)
firewall-cmd --zone=public --add-port=80/tcp --permanent
2.ZeroRPC案例:¶
zeroRPC用起來和這個差不多,也簡單舉個例子吧:
把服務的某個方法注冊到RPCServer
中,供外部服務調用
import zerorpc
class Test(object):
def say_hi(self, name):
return f"Hi,My Name is{name}"
# 注冊一個Test的實例
server = zerorpc.Server(Test())
server.bind("tcp://0.0.0.0:50051")
server.run()
調用服務端代碼:
import zerorpc
client = zerorpc.Client("tcp://192.168.36.123:50051")
result = client.say_hi("RPC")
print(result)
3.3.簡單版自定義RPC¶
看了上面的引入案例,是不是感覺RPC
不過如此?NoNoNo,要是真這么簡單也就談不上RPC架構
了,上面兩個是最簡單的RPC服務了,可以這么說:生產環境基本上用不到,只能當案例練習罷了,對Python來說,最常用的RPC就兩個gRPC
and Thrift
PS:國產最出名的是Dubbo
and Tars
,Net最常用的是gRPC
、Thrift
、Surging
1.RPC服務的流程¶
要自己實現一個RPC Server
那么就得了解整個流程了:
Client
(調用者)以本地調用的方式發起調用- 通過
RPC
服務進行遠程過程調用(RPC的目標就是要把這些步驟都封裝起來,讓使用者感覺不到這個過程)- 客戶端的
RPC Proxy
組件收到調用后,負責將被調用的方法名、參數
等打包編碼成自定義的協議 - 客戶端的
RPC Proxy
組件在打包完成后通過網絡把數據包發送給RPC Server
- 服務端的
RPC Proxy
組件把通過網絡接收到的數據包按照相應格式進行拆包解碼
,獲取方法名和參數 - 服務端的
RPC Proxy
組件根據方法名和參數進行本地調用 RPC Server
(被調用者)本地執行后將結果返回給服務端的RPC Proxy
- 服務端的
RPC Proxy
組件將返回值打包編碼成自定義的協議數據包,並通過網絡發送給客戶端的RPC Proxy
組件 - 客戶端的
RPC Proxy
組件收到數據包后,進行拆包解碼,把數據返回給Client
- 客戶端的
Client
(調用者)得到本次RPC
調用的返回結果
用一張時序圖來描述下整個過程:
PS:RPC Proxy
有時候也叫Stub
(存根):(Client Stub,Server Stub)
為屏蔽客戶調用遠程主機上的對象,必須提供某種方式來模擬本地對象,這種本地對象稱為存根(stub),存根負責接收本地方法調用,並將它們委派給各自的具體實現對象
PRC服務實現的過程中其實就兩核心點:
- 消息協議:客戶端調用的參數和服務端的返回值這些在網絡上傳輸的數據以何種方式打包編碼和拆包解碼
- 經典代表:
Protocol Buffers
- 經典代表:
- 傳輸控制:在網絡中數據的收發傳輸控制具體如何實現(
TCP/UDP/HTTP
)
2.手寫RPC¶
下面我們就根據上面的流程來手寫一個簡單的RPC:
1.Client調用:
# client.py
from client_stub import ClientStub
def main():
stub = ClientStub(("192.168.36.144", 50051))
result = stub.get("sum", (1, 2))
print(f"1+2={result}")
result = stub.get("sum", (1.1, 2))
print(f"1.1+2={result}")
time_str = stub.get("get_time")
print(time_str)
if __name__ == "__main__":
main()
輸出:
1+2=3
1.1+2=3.1
Wed Jan 16 22
2.Client Stub,客戶端存根:(主要有打包
、解包
、和RPC服務器通信
的方法)
# client_stub.py
import socket
class ClientStub(object):
def __init__(self, address):
"""address ==> (ip,port)"""
self.socket = socket.socket()
self.socket.connect(address)
def convert(self, obj):
"""根據類型轉換成對應的類型編號"""
if isinstance(obj, int):
return 1
if isinstance(obj, float):
return 2
if isinstance(obj, str):
return 3
def pack(self, func, args):
"""打包:把方法和參數拼接成自定義的協議
格式:func:函數名@params:類型-參數,類型2-參數2...
"""
result = f"func:{func}"
if args:
params = ""
# params:類型-參數,類型2-參數2...
for item in args:
params += f"{self.convert(item)}-{item},"
# 去除最后一個,
result += f"@params:{params[:-1]}"
# print(result) # log 輸出
return result.encode("utf-8")
def unpack(self, data):
"""解包:獲取返回結果"""
msg = data.decode("utf-8")
# 格式應該是"data:xxxx"
params = msg.split(":")
if len(params) > 1:
return params[1]
return None
def get(self, func, args=None):
"""1.客戶端的RPC Proxy組件收到調用后,負責將被調用的方法名、參數等打包編碼成自定義的協議"""
data = self.pack(func, args)
# 2.客戶端的RPC Proxy組件在打包完成后通過網絡把數據包發送給RPC Server
self.socket.send(data)
# 等待服務端返回結果
data = self.socket.recv(2048)
if data:
return self.unpack(data)
return None
簡要說明下:(我根據流程在Code里面標注了,看起來應該很輕松)
之前有說到核心其實就是消息協議
and傳輸控制
,我客戶端存根
的消息協議是自定義的格式(后面會說簡化方案):func:函數名@params:類型-參數,類型2-參數2...
,傳輸我是基於TCP進行了簡單的封裝
3.Server端:(實現很簡單)
# server.py
import socket
from server_stub import ServerStub
class RPCServer(object):
def __init__(self, address, mycode):
self.mycode = mycode
# 服務端存根(RPC Proxy)
self.server_stub = ServerStub(mycode)
# TCP Socket
self.socket = socket.socket()
# 端口復用
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 綁定端口
self.socket.bind(address)
def run(self):
self.socket.listen()
while True:
# 等待客戶端連接
client_socket, client_addr = self.socket.accept()
print(f"來自{client_addr}的請求:\n")
try:
# 交給服務端存根(Server Proxy)處理
self.server_stub.handle(client_socket, client_addr)
except Exception as ex:
print(ex)
if __name__ == "__main__":
from server_code import MyCode
server = RPCServer(('', 50051), MyCode())
print("Server啟動ing,Port:50051")
server.run()
為了簡潔,服務端代碼我單獨放在了server_code.py
中:
# 5.RPC Server(被調用者)本地執行后將結果返回給服務端的RPC Proxy
class MyCode(object):
def sum(self, a, b):
return a + b
def get_time(self):
import time
return time.ctime()
4.然后再看看重頭戲Server Stub
:
# server_stub.py
import socket
class ServerStub(object):
def __init__(self, mycode):
self.mycode = mycode
def convert(self, num, obj):
"""根據類型編號轉換類型"""
if num == "1":
obj = int(obj)
if num == "2":
obj = float(obj)
if num == "3":
obj = str(obj)
return obj
def unpack(self, data):
"""3.服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
msg = data.decode("utf-8")
# 格式應該是"格式:func:函數名@params:類型編號-參數,類型編號2-參數2..."
array = msg.split("@")
func = array[0].split(":")[1]
if len(array) > 1:
args = list()
for item in array[1].split(":")[1].split(","):
temps = item.split("-")
# 類型轉換
args.append(self.convert(temps[0], temps[1]))
return (func, tuple(args)) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和參數拼接成自定義的協議"""
# 格式:"data:返回值"
return f"data:{result}".encode("utf-8")
def exec(self, func, args=None):
"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
# 如果沒有這個方法則返回None
func = getattr(self.mycode, func, None)
if args:
return func(*args) # 解包
else:
return func() # 無參函數
def handle(self, client_socket, client_addr):
while True:
# 獲取客戶端發送的數據包
data = client_socket.recv(2048)
if data:
try:
data = self.unpack(data) # 解包
if len(data) == 1:
data = self.exec(data[0]) # 執行無參函數
elif len(data) > 1:
data = self.exec(data[0], data[1]) # 執行帶參函數
else:
data = "RPC Server Error Code:500"
except Exception as ex:
data = "RPC Server Function Error"
print(ex)
# 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並通過網絡發送給客戶端的RPC Proxy組件
data = self.pack(data) # 把函數執行結果按指定協議打包
# 把處理過的數據發送給客戶端
client_socket.send(data)
else:
print(f"客戶端:{client_addr}已斷開\n")
break
再簡要說明一下:里面方法其實主要就是解包
、執行函數
、返回值打包
輸出圖示:
再貼一下上面的時序圖:
課外拓展:
HTTP1.0、HTTP1.1 和 HTTP2.0 的區別
https://www.cnblogs.com/heluan/p/8620312.html
簡述分布式RPC框架
https://blog.csdn.net/jamebing/article/details/79610994
分布式基礎—RPC
http://www.dataguru.cn/article-14244-1.html
下節預估:RPC服務進一步簡化與演變、手寫一個簡單的REST接口
4.RPC簡化與提煉¶
上篇回顧:萬物互聯之~RPC專欄 https://www.cnblogs.com/dunitian/p/10279946.html
上節課解答¶
之前有網友問,很多開源的RPC中都是使用路由表,這個怎么實現?
其實路由表實現起來也簡單,代碼基本上不變化,就修改一下server_stub.py
的__init__
和exe
兩個方法就可以了:
class ServerStub(object):
def __init__(self, mycode):
self.func_dict = dict()
# 初始化一個方法名和方法的字典({func_name:func})
for item in mycode.__dir__():
if not item.startswith("_"):
self.func_dict[item] = getattr(mycode, item)
def exec(self, func, args=None):
"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
# 如果沒有這個方法則返回None
# func = getattr(self.mycode, func, None)
func = self.func_dict[func]
if args:
return func(*args) # 解包
else:
return func() # 無參函數
4.1.Json序列化¶
Python比較6的同志對上節課的Code肯定嗤之以鼻,上次自定義協議是同的通用方法,這節課我們先來簡化下代碼:
再貼一下上節課的時序圖:
1.Json知識點¶
官方文檔:https://docs.python.org/3/library/json.html
# 把字典對象轉換為Json字符串
json_str = json.dumps({"func": func, "args": args})
# 把Json字符串重新變成字典對象
data = json.loads(data)
func, args = data["func"], data["args"]
需要注意的就是類型轉換了(eg:python tuple
==> json array
)
Python | JSON |
---|---|
dict | object |
list, tuple | array |
str | string |
int, float | number |
True | true |
False | false |
None | null |
PS:序列化:json.dumps(obj)
,反序列化:json.loads(json_str)
2.消息協議采用Json格式¶
在原有基礎上只需要修改下Stub
的pack
和unpack
方法即可
Client_Stub(類型轉換都省掉了)
import json
import socket
class ClientStub(object):
def pack(self, func, args):
"""打包:把方法和參數拼接成自定義的協議
格式:{"func": "sum", "args": [1, 2]}
"""
json_str = json.dumps({"func": func, "args": args})
# print(json_str) # log 輸出
return json_str.encode("utf-8")
def unpack(self, data):
"""解包:獲取返回結果"""
data = data.decode("utf-8")
# 格式應該是"{data:xxxx}"
data = json.loads(data)
# 獲取不到就返回None
return data.get("data", None)
# 其他Code我沒有改變
Server Stub()
import json
import socket
class ServerStub(object):
def unpack(self, data):
"""3.服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
data = data.decode("utf-8")
# 格式應該是"格式:{"func": "sum", "args": [1, 2]}"
data = json.loads(data)
func, args = data["func"], data["args"]
if args:
return (func, tuple(args)) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和參數拼接成自定義的協議"""
# 格式:"data:返回值"
json_str = json.dumps({"data": result})
return json_str.encode("utf-8")
# 其他Code我沒有改變
輸出圖示:
4.2.Buffer序列化¶
RPC其實更多的是二進制的序列化方式,這邊簡單介紹下
1.pickle知識點¶
官方文檔:https://docs.python.org/3/library/pickle.html
用法和Json
類似,PS:序列化:pickle.dumps(obj)
,反序列化:pickle.loads(buffer)
2.簡單案例¶
和Json案例類似,也只是改了pack
和unpack
,我這邊就貼一下完整代碼(防止被吐槽)
1.Client
# 和上一節一樣
from client_stub import ClientStub
def main():
stub = ClientStub(("192.168.36.144", 50051))
result = stub.get("sum", (1, 2))
print(f"1+2={result}")
result = stub.get("sum", (1.1, 2))
print(f"1.1+2={result}")
time_str = stub.get("get_time")
print(time_str)
if __name__ == "__main__":
main()
2.ClientStub
import socket
import pickle
class ClientStub(object):
def __init__(self, address):
"""address ==> (ip,port)"""
self.socket = socket.socket()
self.socket.connect(address)
def pack(self, func, args):
"""打包:把方法和參數拼接成自定義的協議"""
return pickle.dumps((func, args))
def unpack(self, data):
"""解包:獲取返回結果"""
return pickle.loads(data)
def get(self, func, args=None):
"""1.客戶端的RPC Proxy組件收到調用后,負責將被調用的方法名、參數等打包編碼成自定義的協議"""
data = self.pack(func, args)
# 2.客戶端的RPC Proxy組件在打包完成后通過網絡把數據包發送給RPC Server
self.socket.send(data)
# 等待服務端返回結果
data = self.socket.recv(2048)
if data:
return self.unpack(data)
return None
3.Server
# 和上一節一樣
import socket
from server_stub import ServerStub
class RPCServer(object):
def __init__(self, address, mycode):
self.mycode = mycode
# 服務端存根(RPC Proxy)
self.server_stub = ServerStub(mycode)
# TCP Socket
self.socket = socket.socket()
# 端口復用
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 綁定端口
self.socket.bind(address)
def run(self):
self.socket.listen()
while True:
# 等待客戶端連接
client_socket, client_addr = self.socket.accept()
print(f"來自{client_addr}的請求:\n")
try:
# 交給服務端存根(Server Proxy)處理
self.server_stub.handle(client_socket, client_addr)
except Exception as ex:
print(ex)
if __name__ == "__main__":
from server_code import MyCode
server = RPCServer(('', 50051), MyCode())
print("Server啟動ing,Port:50051")
server.run()
4.ServerCode
# 和上一節一樣
# 5.RPC Server(被調用者)本地執行后將結果返回給服務端的RPC Proxy
class MyCode(object):
def sum(self, a, b):
return a + b
def get_time(self):
import time
return time.ctime()
5.ServerStub
import socket
import pickle
class ServerStub(object):
def __init__(self, mycode):
self.mycode = mycode
def unpack(self, data):
"""3.服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
func, args = pickle.loads(data)
if args:
return (func, args) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和參數拼接成自定義的協議"""
return pickle.dumps(result)
def exec(self, func, args=None):
"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
# 如果沒有這個方法則返回None
func = getattr(self.mycode, func)
if args:
return func(*args) # 解包
else:
return func() # 無參函數
def handle(self, client_socket, client_addr):
while True:
# 獲取客戶端發送的數據包
data = client_socket.recv(2048)
if data:
try:
data = self.unpack(data) # 解包
if len(data) == 1:
data = self.exec(data[0]) # 執行無參函數
elif len(data) > 1:
data = self.exec(data[0], data[1]) # 執行帶參函數
else:
data = "RPC Server Error Code:500"
except Exception as ex:
data = "RPC Server Function Error"
print(ex)
# 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並通過網絡發送給客戶端的RPC Proxy組件
data = self.pack(data) # 把函數執行結果按指定協議打包
# 把處理過的數據發送給客戶端
client_socket.send(data)
else:
print(f"客戶端:{client_addr}已斷開\n")
break
輸出圖示:
然后關於RPC高級的內容(會涉及到注冊中心
),咱們后面說架構的時候繼續,網絡這邊就說到這
5.Restful API¶
RESTful只是接口協議規范,它是建立在http基礎上的,我們在網絡加強篇的末尾簡單帶一下,后面講爬蟲應該會再給大家說的
5.1.實現一個簡單的REST接口¶
在編寫REST接口時,一般都是為HTTP服務的。為了實現一個簡單的REST接口,你只需讓代碼滿足Python的WSGI
標准即可
1.Restful引入¶
這邊我就不自己實現了(上面手寫服務器的時候其實已經展示了Restful接口是啥樣),用Flask
快速過一遍:
看個引入案例:
import flask
app = flask.Flask(__name__)
@app.route("/")
def index():
return "This is Restful API Test"
if __name__ == "__main__":
app.run()
圖示輸出:
Server Log:
* Serving Flask app "1.test" (lazy loading)
* Environment: production
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
* Debug mode: off
* Running on http://127.0.0.1:8080/ (Press CTRL+C to quit)
127.0.0.1 - - [17/Jan/2019 17:24:02] "GET / HTTP/1.1" 200 -
2.簡單版RESTful Services¶
舉個查詢服務器節點
信息的例子:/api/servers/
import flask
from infos import info_list
app = flask.Flask(__name__)
# Json的404自定義處理(不加自定義處理會返回默認404頁面)
@app.errorhandler(404)
def not_found(error):
return flask.make_response(
flask.jsonify({
"data": "Not Found",
"status": 404
}), 404)
# 運行Get和Post請求
@app.route("/api/v1.0/servers/<name>", methods=["GET", "POST"])
def get_info(name):
infos = list(filter(lambda item: item["name"] == name, info_list))
if len(infos) == 0:
flask.abort(404) # 404
# 基於json.dumps的封裝版
return flask.jsonify({"infos": infos}) # 返回Json字符串
if __name__ == "__main__":
app.run(port=8080)
圖示輸出:(不深入說,后面爬蟲會再提的)
課后拓展:
RESTful API 設計指南
http://www.ruanyifeng.com/blog/2014/05/restful_api.html
RESTful API 最佳實踐
http://www.ruanyifeng.com/blog/2018/10/restful-api-best-practices.html
異步 API 的設計
http://www.ruanyifeng.com/blog/2018/12/async-api-design.html
使用python的Flask實現一個RESTful API服務器端[翻譯]
https://www.cnblogs.com/vovlie/p/4178077.html