Python中實現協程並發查詢數據庫


這周又填了一個以前挖下的坑。

這個博客系統使用Psycopy庫實現與PostgreSQL數據庫的通信。前期,只是泛泛地了解了一下SQL語言,然后就胡亂拼湊出這么一個簡易博客系統。

10月份找到工作以后,認真讀了《數據庫系統概念》這本書,對數據庫有了更深的認識。然后就開始對博客系統的數據庫查詢模塊開始重構。

改進之前

之前,我的查詢步驟很簡單,就是:

前端提交查詢請求 --> 建立數據庫連接 --> 新建游標 --> 執行命令 --> 接受結果 --> 關閉游標、連接

這幾大步驟的順序執行。

這里面當然問題很大:

  1. 建立數據庫連接實際上就是新建一個套接字。這是進程間通信的幾種方法里,開銷最大的了。
  2. 在“執行命令”和“接受結果”兩個步驟中,線程在阻塞在數據庫內部的運行過程中,數據庫連接和游標都處於閑置狀態。

這樣一來,每一次查詢都要順序的新建數據庫連接,都要阻塞在數據庫返回結果的過程中。當前端提交大量查詢請求時,查詢效率肯定是很低的。

第一次改進

之前的模塊里,問題最大的就是第一步——建立數據庫連接套接字了。如果能夠一次性建立連接,之后查詢能夠反復服用這個連接就好了。

所以,首先應該把數據庫查詢模塊作為一個單獨的守護進程去執行,而前端app作為主進程響應用戶的點擊操作。那么兩條進程怎么傳遞消息呢?翻了幾天Python文檔,終於構思出來:用隊列queue作為生產者(web前端)向消費者(數據庫后端)傳遞任務的渠道。生產者,會與SQL命令一起,同時傳遞一個管道pipe的連接對象,作為任務完成后,回傳結果的渠道。確保,任務的接收方與發送方保持一致。

作為第二個問題的解決方法,可以使用線程池來並發獲取任務隊列中的task,然后執行命令並回傳結果。

第二次改進

第一次改進的效果還是很明顯的,不用任何測試手段。直接點擊頁面鏈接,可以很直觀地感覺到反應速度有很明顯的加快。

但是對於第二個問題,使用線程池還是有些欠妥當。因為,CPython解釋器存在GIL問題,所有線程實際上都在一個解釋器進程里調度。線程稍微開多一點,解釋器進程就會頻繁的切換線程,而線程切換的開銷也不小。線程多一點,甚至會出現“抖動”問題(也就是剛剛喚醒一個線程,就進入掛起狀態,剛剛換到棧幀或內存的上下文,又被換回內存或者磁盤),效率大大降低。也就是說,線程池的並發量很有限

試過了多進程、多線程,只能在單個線程里做文章了。

Python中的asyncio庫

Python里有大量的協程庫可以實現單線程內的並發操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio庫同樣可以實現協程並發。asyncio庫大大降低了Python中協程的實現難度,就像定義普通函數那樣就可以了,只是要在def前面多加一個async關鍵詞。async def函數中,需要阻塞在其他async def函數的位置前面可以加上await關鍵詞。

import asyncio

async def wait():
	await asyncio.sleep(2)
	
async def execute(task):
	process_task(task)
	await wait()
	continue_job()

async def函數的執行稍微麻煩點。需要首先獲取一個loop對象,然后由這個對象代為執行async def函數。

loop = asyncio.get_event_loop()
loop.run_until_complete(execute(task))
loop.close()

loop在執行execute(task)函數時,如果遇到await關鍵字,就會暫時掛起當前協程,轉而去執行其他阻塞在await關鍵詞的協程,從而實現協程並發。

不過需要注意的是,run_until_complete()函數本身是一個阻塞函數。也就是說,當前線程會等候一個run_until_complete()函數執行完畢之后,才會繼續執行下一部函數。所以下面這段代碼並不能並發執行。

for task in task_list: 
	loop.run_until_complete(task)

對與這個問題,asyncio庫也有相應的解決方案:gather函數。

loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(execute(task))
			for task in task_list]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

當然了,async def函數的執行並不只有這兩種解決方案,還有call_soonrun_forever的配合執行等等,更多內容還請參考官方文檔。

Python下的I/O多路復用

協程,實際上,也存在上下文切換,只不過開銷很輕微。而I/O多路復用則完全不存在這個問題。

目前,Linux上比較火的I/O多路復用API要算epoll了。Tornado,就是通過調用C語言封裝的epoll庫,成功解決了C10K問題(當然還有Pypy的功勞)。

在Linux里查文檔,可以看到epoll只有三類函數,調用起來比較方便易懂。

  1. 創建epoll對象,並返回其對應的文件描述符(file descriptor)。

    int epoll_create(int size); 
    int epoll_create1(int flags);
    
  2. 控制監聽事件。第一個參數epfd就對應於前面命令創建的epoll對象的文件描述符;第二個參數表示該命令要執行的動作:監聽事件的新增、修改或者刪除;第三個參數,是要監聽的文件對應的描述符;第四個,代表要監聽的事件。

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    
  3. 等候。這是一個阻塞函數,調用者會等候內核通知所注冊的事件被觸發。

    int epoll_wait(int epfd, struct epoll_event *events,
          	           int maxevents, int timeout);
    int epoll_pwait(int epfd, struct epoll_event *events,
    	             int maxevents, int timeout,
    	             const sigset_t *sigmask);
    

在Python的select庫里:

  1. select.epoll()對應於第一類創建函數;
  2. epoll.register()epoll.unregister()epoll.modify()均是對控制函數epoll_ctl的封裝;
  3. epoll.poll()則是對等候函數epoll_wait的封裝。

Python里epoll相關API的最大問題應該是在epoll.poll()。相比於其所封裝的epoll_wait,用戶無法手動指定要等候的事件,也就是后者的第二個參數struct epoll_event *events。所以沒法實現精確控制。

如果一定要精確控制

那只能使用替代方案:select.select()函數。

根據Python官方文檔,select.select(rlist, wlist, xlist[, timeout])是對Unix系統中select函數的直接調用,與C語言API的傳參很接近。前三個參數都是列表,其中的元素都是要注冊到內核的文件描述符。如果想用自定義類,就要確保實現了fileno()方法。

這三個參數分別對應於:

  1. rlist: 等候直到可讀
  2. wlist: 等候直到可寫
  3. xlist: 等候直到異常。這個異常的定義,要查看系統文檔。

select.select(),類似於epoll.poll(),先注冊文件和事件,然后保持等候內核通知,是阻塞函數。

select的缺點是:

  1. 復雜度為O(n)。每次都要輪詢所有注冊的描述符,然后返回結果;
  2. 每個進程只能支持注冊1024個文件描述符,因為每一個select都在進程的內存中請求固定大小的空間,使用位圖索引建立描述符集合數據結構,線程請求注冊描述符的時候,select將描述符放入其中。

而epoll的ET邊緣觸發模式則是在文件描述符可用時方才所出響應,復雜度為O(1)。

實際應用

Psycopg2庫支持對異步和協程,但和一般情況下的用法略有區別。普通數據庫連接支持不同線程中的不同游標並發查詢;而異步連接則不支持不同游標的同時查詢。所以異步連接的不同游標之間必須使用I/O復用方法來協調調度。

所以,我的大致實現思路是這樣的:首先並發執行大量協程,從任務隊列中提取任務,再向連接池請求連接,創建游標,然后執行命令,並返回結果。在獲取游標和接受查詢結果之前,均要阻塞等候內核通知連接可用。

其中,連接池返回連接時,會根據引用連接的協程數量,返回負載最輕的連接。這也是自己定義AsyncConnectionPool類的目的。

我的代碼位於:bottle-blog/dbservice.py

存在問題

當然了,這個流程目前還一些問題。

首先就是每次輪詢拿到任務之后,都會走這么一個流程。

獲取連接 --> 新建游標 --> 執行任務 --> 關閉游標 --> 取消連接引用

本來,最好的情況應該是:在輪詢之前,就建好游標;在輪詢時,直接等候內核通知,執行相應任務。這樣可以減少輪詢時的任務量。但是如果協程提前對應好連接,那就不能保證在獲取任務時,保持各連接負載均衡了。

所以這一塊,還有工作要做。

然后就是,應該建立一個緩存機制。將一定時限的查詢結果存儲起來,可以直接返回,避免短時間內重復查詢相同內容。

最后,請允許我吐槽一下Python的epoll相關文檔:簡直太弱了!!!必須看源碼才能弄清楚功能。

參考文檔

  1. 18.5. asyncio — Asynchronous I/O, event loop, coroutines and tasks
  2. epoll機制:epoll_create、epoll_ctl、epoll_wait、close
  3. epoll(7) - Linux man page
  4. cpython/Modules/selectmodule.c
  5. 18.3. select — Waiting for I/O completion
  6. Psycopg 2.6.2 documentation >> Asynchronous support
  7. psycopg2/lib/pool.py


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM