150行代碼搭建異步非阻塞Web框架


最近看Tornado源碼給了我不少啟發,心血來潮決定自己試着只用python標准庫來實現一個異步非阻塞web框架。花了點時間感覺還可以,一百多行的代碼已經可以撐起一個極簡框架了。

一、准備工作

需要的相關知識點:

  • HTTP協議的請求和響應
  • IO多路復用
  • asyncio

掌握上面三個點的知識就完全沒有問題,不是很清楚的同學我也推薦幾篇參考文章

  HTTP協議詳細介紹(https://www.cnblogs.com/haiyan123/p/7777924.html

  Python篇-IO多路復用詳解(https://www.jianshu.com/p/818f27379a5e

  Python異步IO之協程(一):從yield from到async的使用(https://blog.csdn.net/SL_World/article/details/86597738

實驗環境:

python 3.7.3

 由於在框架中會使用到async/await關鍵字,所以只要確保python版本在3.5以上即可。

 二、框架功能目標

我們的框架要實現最基本的幾個功能:

  • 封裝HTTP請求響應
  • 路由映射
  • 類視圖和函數視圖
  • 協程支持

 當然一個完善的web框架需要實現的遠遠不止這些,這里我們現在只需要它能跑起來就足夠了。

三、封裝HTTP協議

HTTP是基於TCP/IP通信協議來實現數據傳輸,與一般的C/S相比,它的特點在於當客戶端(瀏覽器)向服務端發起HTTP請求,服務端響應數據后雙方立馬斷開連接,服務端無法主動向客戶端發送數據。HTTP協議數據傳輸內容分為請求頭和請求體,請求頭和請求體之間使用"\r\n\r\n"進行分隔。在請求頭中,第一行包含了請求方式,請求路徑和HTTP協議,此后每一行以key: value的形式傳輸數據。

對於我們的web服務端來說,需要的就是解析http請求和處理http響應。

我們通過寫兩個類,HttpRequest和HttpResponse來實現。

3.1 HttpRequest

HttpRequest設計目標是解析從socket接收request數據

 1 class HttpRequest(object):
 2     def __init__(self, content: bytes):
 3         self.content = content.decode('utf-8')
 4         self.headers = {}
 5         self.GET = {}
 6         self.url = ''
 7         self.full_path = ''
 8         self.body = ''
 9         try:
10             header, self.body = self.content.split('\r\n\r\n')
11             temp = header.split('\r\n')
12             first_line = temp.pop(0)
13             self.method, self.url, self.protocol = first_line.split(' ')
14             self.full_path = self.url
15             for t in temp:
16                 k, v = t.split(': ', 1)
17                 self.headers[k] = v
18         except Exception as e:
19             print(e)
20         if len(self.url.split('?')) > 1: # 解析GET參數
21             self.url = self.full_path.split('?')[0] # 把url中攜帶的參數去掉
22             parms = self.full_path.split('?')[1].split('&')
23             for p in parms: # 將GET參數添加到self.GET字典
24                 k, v = p.split('=')
25                 self.GET[k] = v

在類中,我們實現解析http請求的headers、method、url和GET參數,其實還有很多事情沒有做,比如使用POST傳輸數據時,數據是在請求體中,針對這部分內容我並沒有開始寫,原因在於本文主要目的還是異步非阻塞框架,目前的功能已經足以支持我們進行下一步實驗了。

3.2 HttpResponse

HTTP響應也可以分為響應頭和響應體,我們可以很簡單的實現一個response:

 1 class HttpResponse(object):
 2     def __init__(self, data: str):
 3         self.status_code = 200 # 默認響應狀態 200
 4         self.headers = 'HTTP/1.1 %s OK\r\n'
 5         self.headers += 'Server:AsyncWeb'
 6         self.headers += '\r\n\r\n'
 7         self.data = data
 8 
 9     @property
10     def content(self):
11         return bytes((self.headers + self.data) % self.status_code, encoding='utf8')

HttpResponse中並沒有做太多的事情,接受一個字符串,並使用content返回一個滿足HTTP響應格式的bytes。

從用戶調用角度,可以使用return HttpResponse("歡迎來到AsynicWeb")來返回數據。

我們也可以簡單的定義一個404頁面:

Http404 = HttpResponse('<html><h1>404</h1></html>')
Http404.status_code = 404

四、路由映射

路由映射簡單理解就是從一個URL地址找到對應的邏輯函數。舉個例子,我們訪問http://127.0.0.1:8000這個頁面,在http請求中它的url是"/",在web服務器中有一個函數index,web服務器能夠由url地址"/"找到函數index,這就是一個路由映射。

其實路由映射實現起來非常簡單。我們只要定義一個映射列表,列表中的每個元素包含url和邏輯處理(視圖函數)兩部分,當一個http請求到達的時候,遍歷映射列表,使用正則匹配每一個url,如果請求的url和映射表中的相同,我們就可以取出對應的視圖函數。

路由映射表是完全由用戶來定義映射關系的,它應該使用一個我們定義的標准結構,比如:

routers = [
    ('/$', IndexView),
    ('/home', asy)
]

 

五、類視圖和函數視圖

視圖是指能夠根據一個請求,執行某些邏輯運算,最終返回響應的模塊。說到這里,一個web框架的運行流程就出來了:

    http請求——路由映射表——視圖——執行視圖獲取返回值——http響應

在我們的框架中,借鑒Django的設計,我們讓它支持類視圖(CBV)和函數視圖(FBV)兩種模式。

對於函數視圖,完全由用戶自己定義,只要至少能夠接受一個request參數即可

對於類視圖,我們需要做一些預處理,確保用戶按我們的規則來實現類視圖。

定義一個View類:

1 class View(object):
2     # CBV應繼承View類
3     def dispatch(self, request):
4         method = request.method.lower()
5         if hasattr(self, method):
6             return getattr(self, method)(request)
7         else:
8             return Http404

 在View類中,我們只寫了一個dispatch方法,其實就做了一件事:反射。當我們在路由映射表中找對應的視圖時,如果判斷視圖屬於類,我們就調用dispatch方法。

從用戶角度來看,實現一個CBV只需要繼承View類,然后通過定義get、post、delete等方法來實現不同的處理。

六、socket和多路復用

上面幾個小節實現了web框架的大體執行路徑,從這節開始我們實現web服務器的核心。

通過IO多路復用可以達到單線程實現高並發的效果,一個標准的IO多路復用寫法:

 1 server = socket(AF_INET, SOCK_STREAM)
 2 server.bind(("127.0.0.1", 8000))
 3 server.setblocking(False) # 設置非阻塞
 4 server.listen(128)
 5 Future_Task_Wait = {}
 6 rlist = [server, ]
 7 while True:
 8     r, w, x = select.select(rlist, [], [], 0.1)
 9     for o in r:
10         if o == server:
11             '''判斷o是server還是conn'''
12             conn, addr = o.accept()
13             conn.setblocking(False) # 設置非阻塞
14             rlist.append(conn) # 客戶連接 加入輪詢列表
15         else:
16             data = b""
17             while True: # 接收客戶傳輸數據
18                 try:
19                     chunk = o.recv(1024)
20                     data = data + chunk
21                 except Exception as e:
22                     chunk = None
23                 if not chunk:
24                     break
25             dosomething(o, data, routers) # 拿到數據干點啥

 通過這段代碼我們可以獲得所有的請求了,下一步就是處理這些請求。

我們就定義一個dosomething函數

 1 import re
 2 import time
 3 from types import FunctionType
 4 
 5 def dosomething(o, data, routers):
 6     '''解析http請求,尋找映射函數並執行得到結果
7 :param o: socket連接對象 8 :param data: socket接收數據 9 :return: 響應結果 10 ''' 11 request = HttpRequest(data) 12 print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0], 13 request.method, request.url) 14 flag = False 15 for router in routers: 16 if re.match(router[0], request.url): 17 target = router[1] 18 flag = True 19 break 20 if flag: 21 # 判斷targe是函數還是類 22 if isinstance(target, FunctionType): 23 result = target(request) 24 elif issubclass(target, View): 25 result = target().dispatch(request) 26 else: 27 result = Http404 28 else: 29 result = Http404 30 return result

這段代碼做了這么幾件事。1.實例化HttpRequest;2.使用正則遍歷路由映射表;3.將request傳入視圖函數或類視圖的dispatch方法;4.拿到result結果

我們通過result = dosomething(o, data, routers)可以拿到結果,接下來我們只需要把結果發回給客戶端並斷開連接就可以了

o.sendall(result.content)  # 由於result是一個HttpResponse對象 我們使用content屬性
rlist.remove(o) # 從輪詢中刪除連接
o.close() # 關閉連接

至此,我們的web框架已經搭建好了。

但它還是一個同步的框架,在我們的服務端中,其實一直通過while循環在監聽select是否變化,假如我們在視圖函數中添加IO操作,其他連接依然會阻塞等待,接下來讓我們的框架實現對協程的支持。

七、協程支持

在實現協程之前,我們先聊聊Tornado的Future對象。可以說Tornado異步非阻塞的實現核心就是Future。

Future對象內部維護了一個重要屬性_result,這是一個標記位,一個剛實例化的Future內部的_result=None,我們可以通過其他操作來更改_result的狀態。另一方面,我們可以一直監聽每個Future對象的_result狀態,如果發生變化就執行某些特定的操作。

我們在第六節定義的dosomething函數中拿到了一個result,它應當是一個HttpResponse對象,那么能不能返回一個Future對象呢。

假如result是一個Future對象,我們的服務端不立馬返回結果,而是把Future放進另一個輪詢列表中,當Future內的_result改變時再返回結果,就達到了異步的效果。

我們也可以定義一個Future類,這個類維護只一個變量result:

1 class Future(object):
2     def __init__(self):
3         self.result = None

 對於框架使用者來說,在視圖函數要么返回一個HttpResponse對象代表立即返回,要么返回一個Future對象說你先別管我,我把事情干完了再通知你返回結果。

既然視圖函數返回的可能不只是HttpResponse對象,那么我們就需要對第六步的代碼增加額外的處理:

Future_Task_Wait = {} # 定義一個異步Future字典
result = dosomething() # 拿到結果后執行下面判斷
if isinstance(result, Future):
    Future_Task_Wait[o] = result # Futre對象則加入字典
else:
    o.sendall(result.content) # 非Future對象直接返回結果並斷開連接
    rlist.remove(o)
    o.close()

在while True輪詢內再增加一段代碼,遍歷Future_Task_Wait字典:

rm_conn = [] # 需要移除列表的conn
for conn, future in Future_Task_Wait.items():
    if future.result:
        try:
            conn.sendall(HttpResponse(data=future.result).content) # 返回result
        finally:
            rlist.remove(conn) 
            conn.close()
            rm_conn.append(conn)
for conn in rm_conn: # 在字典中刪除conn
    del Future_Task_Wait[conn]

 這樣,我們就可以返回一個Future來告訴服務器這是將來才返回的對象。

那回歸正題,我們到底該如何使用協程?這里我用的方法是創建一個子線程來執行協程事件循環,主線程永遠在監聽socket。

from threading import Thread
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
coroutine_loop = asyncio.new_event_loop()  # 創建協程事件循環
run_loop_thread = Thread(target=start_loop, args=(coroutine_loop,))  # 新起線程運行事件循環, 防止阻塞主線程
run_loop_thread.start()  # 運行線程,即運行協程事件循環

當我們要把asyncdo方法添加作為協程任務時

asyncio.run_coroutine_threadsafe(asyncdo(), coroutine_loop)

好了,異步非阻塞的核心代碼分析的差不多了,將六七節的代碼整合寫成一個類

  1 import re
  2 import time
  3 import select
  4 import asyncio
  5 from socket import *
  6 from threading import Thread
  7 from types import FunctionType
  8 from http.response import Http404, HttpResponse
  9 from http.request import HttpRequest
 10 from views import View
 11 from core.future import Future
 12 
 13 class App(object):
 14     # web應用程序
 15     coroutine_loop = None
 16 
 17     def __new__(cls, *args, **kwargs):
 18         # 使用單例模式
 19         if not hasattr(cls, '_instance'):
 20             App._instance = super().__new__(cls)
 21         return App._instance
 22 
 23     def listen(self, host, port, routers):
 24         # IO多路復用監聽連接
 25         server = socket(AF_INET, SOCK_STREAM)
 26         server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
 27         server.bind((host, port))
 28         server.setblocking(False)
 29         server.listen(128)
 30         Future_Task_Wait = {}
 31         rlist = [server, ]
 32         while True:
 33             r, w, x = select.select(rlist, [], [], 0.01)
 34             for o in r:
 35                 if o == server:
 36                     '''判斷o是server還是conn'''
 37                     conn, addr = o.accept()
 38                     conn.setblocking(False)
 39                     rlist.append(conn)
 40                 else:
 41                     data = b""
 42                     while True:
 43                         try:
 44                             chunk = o.recv(1024)
 45                             data = data + chunk
 46                         except Exception as e:
 47                             chunk = None
 48                         if not chunk:
 49                             break
 50                     try:
 51                         request = HttpRequest(data, o)
 52                         print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0],
 53                               request.method, request.url)
 54                         flag = False
 55                         for router in routers:
 56                             if re.match(router[0], request.url):
 57                                 target = router[1]
 58                                 flag = True
 59                                 break
 60                         if flag:
 61                             # 判斷targe是函數還是類
 62                             if isinstance(target, FunctionType):
 63                                 result = target(request)
 64                             elif issubclass(target, View):
 65                                 result = target().dispatch(request)
 66                             else:
 67                                 result = Http404
 68                         else:
 69                             result = Http404
 70                         # 判斷result是不是future
 71                         if isinstance(result, Future):
 72                             Future_Task_Wait[o] = result
 73                         else:
 74                             o.sendall(result.content)
 75                             rlist.remove(o)
 76                             o.close()
 77                     except Exception as e:
 78                         print(e)
 79             rm_conn = []
 80             for conn, future in Future_Task_Wait.items():
 81                 if future.result:
 82                     try:
 83                         conn.sendall(HttpResponse(data=future.result).content)
 84                     finally:
 85                         rlist.remove(conn)
 86                         conn.close()
 87                         rm_conn.append(conn)
 88             for conn in rm_conn:
 89                 del Future_Task_Wait[conn]
 90 
 91     def run(self, host='127.0.0.1', port=8000, routers=()):
 92         # 主線程select多路復用,處理http請求和響應
 93         # 給協程單獨創建一個子線程,負責處理View函數提交的協程
 94         def start_loop(loop):
 95             asyncio.set_event_loop(loop)
 96             loop.run_forever()
 97         self.coroutine_loop = asyncio.new_event_loop()  # 創建協程事件循環
 98         run_loop_thread = Thread(target=start_loop, args=(self.coroutine_loop,))  # 新起線程運行事件循環, 防止阻塞主線程
 99         run_loop_thread.start()  # 運行線程,即運行協程事件循環
100         self.listen(host, port, routers)

八、框架測試

現在,可以測試我們的web框架了。

 1 import asyncio
 2 from core.server import App
 3 from views import View
 4 from http.response import *
 5 from core.future import Future
 6 
 7 
 8 class IndexView(View):
 9     def get(self, request):
10         return HttpResponse('歡迎來到首頁')
11 
12     def post(self, request):
13         return HttpResponse('post')
14 
15 def asy(request):
16     future = Future()
17     print('異步調用')
18     wait = request.url.split('/')[-1]
19     try:
20         wait = int(wait)
21     except:
22         wait = 5
23     asyncio.run_coroutine_threadsafe(dosomething(future, wait), app.coroutine_loop)
24     print('返回Future')
25     return future
26 
27 async def dosomething(future, wait):
28     # 異步函數
29     await asyncio.sleep(wait)# 模擬異步操作
30     future.result = '等待了%s秒' % wait
31 
32 routers = [
33     ('/$', IndexView),
34     ('/home', asy)
35 ]
36 
37 # 從用戶角度只需使用run()
38 app = App()
39 app.run('127.0.0.1', 8080, routers=routers)

瀏覽器訪問http://127.0.0.1:8080,返回沒有問題,如果有同學使用Chrome可能會亂碼,那是因為我們的HttpResponse沒有返回指定編碼,添加一個響應頭即可。

瀏覽器訪問http://127.0.0.1:8080/home,這時候會執行協程,默認等待5s后返回結果,你可以在多個標簽頁訪問這個地址,通過等待時間來驗證我們的異步框架是否正常工作。

九、其他

至此,我們要實現的異步非阻塞web框架已經完成了。當然這個框架說到底還是太簡陋,后續完全可以優化HttpRequest和HttpResponse、增加對數據庫、模板語言等等組件的擴展。

完整源碼已經上傳至https://github.com/sswest/AsyncWeb


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM