眾所周知,python的多線程開發在GIL(全局器解釋鎖)下飽受詬病,在單核模式下搞多線程對效率的提升相當有限。於是大家的共識就是搞io密集的程序,建議采用多線程,計算密集型的程序就搞多進程。近期的一些開發經歷,讓我大量嘗試采用多進程和異步io的方式來提高效率。
一.采用多進程。
1.用過multiprocessing.process和queue及pool,但是一直有報錯,位置在multiprocessing.spawn的_main(fd,parent_sentinel)中reduction.pickle.load(from_parent)。網上提供了各種諸如修改權限,修改reduction協議類型的方案,始終未能解決。最后竟然是更新了python版本后不葯而愈,但最后放棄了這種方式。
2.使用concurrent.futures.ProcessPoolExecutor,官方文檔很詳細。程序運行也很不錯,但是在腳本執行完畢后退出時拋出異常。Error in atexit._run_exitfuncs。大致的內容是handle is closed,通過追蹤大致可以判斷出腳本執行完畢時,會有futures.process的_python_exit()執行,此時ProcessPoolExecutor執行完畢后釋放,沒有線程可以被wakeup,所以報錯。引起這個問題的是官方推薦的with as寫法,with as 執行完畢后會將對象釋放,結果在退出的時候引發異常。
解決方案也很反常,不使用with as的自動釋放,也不使用shutdown手動釋放,而是不釋放,在整個腳本執行完畢的時候,由_python_exit()進行釋放。
有意思的是concurrent.futures.process文檔里有這么一段注釋,可以自行研究。
# Workers are created as daemon threads and processes. This is done to allow the
# interpreter to exit when there are still idle processes in a
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
# allowing workers to die with the interpreter has two undesirable properties:
# - The workers would still be running during interpreter shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
二.采用異步io。
python在3.4中有了asyncio。官方給的示例可以看出來這是個消息循環,eventloop是在當前的上下文中提供的,而且可以和協程一起執行。最新的文檔里可以看到eventloop已經不再只是同協程一起提供異步了。
我們可以將多個耗時的任務進行封裝,丟進eventloop中,當線程執行到需要等待的操作如io,主線程不會等待,而是切換執行到下一個可執行任務,因此可以實現並發執行。
三.多進程和異步io結合。
經過嘗試多線程中沒有找到使用異步io的方式,因為eventloop是從當前上下文中提供的,也就是主線程。於是換了個思路,使用多進程,讓每個進程的‘主線程’通過異步io來實現並發,經多次嘗試后這種方案是可行的。
因為同時運行的進程數量上限是受cpu內核數量的上限影響的,一般的建議是不超過內核數量,但是在一些場景的限制下為了提高效率,單純受限的多進程不能滿足要求的時候,不妨將多進程的‘主進程’結合並發來提高效率。
由於沒有實際測試性能,不能斷言究竟哪種方式效率更高,畢竟也與任務內容有關。這只是一種思路,留待日后驗證。
代碼如下:
import asyncio import aiohttp from concurrent.futures import ProcessPoolExecutor, as_completed ,ThreadPoolExecutor import time async def post_http(): # 示例 url = '' data = '' async with aiohttp.ClientSession() as session: async with session.post(url=url, data=data, headers={}, timeout=60) as resp: r_json = await resp.json() return r_json async def t_handler(data, t_flag, p_flag, semaphore): async with semaphore: for d in data: print(f'pid:{p_flag} tid:{t_flag} data:{d}') await asyncio.sleep(1) # 處理費時的io操作,比如httprequest return def p_handler(datas, p_flag): # 線程並發數需要有限制 linux打開文件最大默認為1024 win為509 待確認 ts = time.time() num = 10 # 最大並發數 count = len(datas) block = int(count / num) + 1 tar_datas = [datas[i * block: (i + 1) * block if (i + 1) * block < count else count] for i in range(num)] semaphore = asyncio.Semaphore(num) tasks = [t_handler(d, i, p_flag, semaphore) for i, d in enumerate(tar_datas)] loop = asyncio.get_event_loop() # 基於當前線程 ,故在多線程中無法使用 只能在多進程中使用 loop.run_until_complete(asyncio.wait(tasks)) loop.close() return f'\033[0;32mprocess {p_flag} :cost {time.time() - ts}\033[0m' if __name__ == '__main__': ts = time.time() datas = [i for i in range(1000)] datas = [datas[i * 100:(i + 1) * 100] for i in range(10)] # 每個進程要處理的數據 # 啟動異步io 主線程調用 event_loop 在當前線程下啟動異步io 實現並發 # res = p_handler(datas,1) # print(res) p_num = 10 block_len = 100 datas = [datas[i * 100:(i + 1) * 100] for i in range(p_num)] # 每個進程要處理的數據 # ProcessPoolExecutor 可能與運行環境有關 官方的 with as 會主動釋放線程 導致主線程退出時找不到進程池內進程已經被釋放 導致Error in atexit._run_exitfuncs異常 executor = ProcessPoolExecutor(p_num) futures = [executor.submit(p_handler, d, p_flag) for p_flag, d in enumerate(datas)] for f in as_completed(futures): if f.done(): res = f.result() print(res) print(f'Exit!! cost:{time.time() - ts}')