Pythpn並發編程——多線程與協程
目錄
1. 進程與線程
1.1 概念上
- 對於操作系統來說,一個任務就是一個進程Process,在一個進程內部,要同時干很多事情,就需要同時運行多個子任務,進程內的這些子任務就稱為線程Thread
- 操作系統是讓各個任務交替執行實現支持多任務的,真正地同時執行多任務需要多核CPU才能實現
- 線程是最小的執行單元,一個進程至少有一個線程,如何調讀進程和線程,完全由操作系統決定,程序自己不能決定什么時候執行,執行多長時間
1.2 多進程與多線程——同時執行多個任務
要實現多任務,設計Master-Worker模式,Master負責分配任務,Worker負責執行任務
- 多進程模式
- 啟動多個進程,每個進程只有一個線程,多個進程可以一塊執行多個任務
- 最大的優點:穩定性高,一個子進程崩潰了,不會影響主進程和其他子進程
- 缺點:創建進程的開銷大
- 多線程模式
- 啟動一個進程,在一個進程內啟動多個線程,多個線程也可以一塊執行多個任務
- 致命缺點:任何一個線程掛掉都可能造成整個進程崩潰,因為所有線程共享進程的內存
- 多進程+多線程模式
- 實際很少采用
2. 並發和並行
並發:不是指同一時刻有多個操作同時進行,實際上,在某個特定時刻,只允許有一個操作發生,線程/任務之間互相切換,直到完成,threading和asyncio
- 通常應用於I/O操作頻繁的場景,例如從網站上下載多個文件
並行:同一時刻,同時發生,multi-processing,m個處理器,開m個進程
3. Python多線程——futures
3.1 多線程用法
- 1.導入future模塊,Python中的future模塊,位於concurrent.futures 和 asyncio 中
- 2.創建線程池,函數ThreadPoolExecutor(max_workers=5),max_workers設置線程個數
- 3.調用,map()函數
import concurrent.futures
import requests
import threading
import time
def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))
def download_all(sites):
# 並發模式,創建了一個線性池,總共有5個線性可以分配使用
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 對sites中的每個元素,並發地調用函數download_one
executor.map(download_one, sites)
# 並行模式,創建進程池,系統自動返回CPU的數量作為可以調用的進程數
# with concurrent.futures.ProcessPoolExecutor() as executor: #
# 另一種寫法
# def download_all(sites):
# with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# to_do =[] #
# for site in sites:
# future = executor.submit(download_one, site)
# to_do.append(future)
# for future in concurrent.futures.as_completed(to_do):
# future.result()
def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography'
]
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print(f'Download {len(sites)} sites in {end_time - start_time} seconds')
if __name__ == '__main__':
main()
Read 182102 from https://en.wikipedia.org/wiki/Portal:Arts
Read 245181 from https://en.wikipedia.org/wiki/Portal:Society
Read 206928 from https://en.wikipedia.org/wiki/Portal:History
Read 336222 from https://en.wikipedia.org/wiki/Portal:Biography
Download 4 sites in 0.22546189799322747 seconds
3.2. 為什么多線程每次只允許只能有一個線程執行?
全局解釋器鎖的存在,GIL(Global Interpreter Lock)
3.3 多線程的缺點
- 多線程運行過程容易被打斷,因此有可能出現 race condition 的情況
- 線程切換本身存在一定的損耗,線程數不能無限增加
4. python協程——asyncio
4.1 概念
單線程的異步編程模型稱為協程。在執行過程中,在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接着執行,注意,這不是函數調用。
Async異步:不同操作間可以相互交替執行,如果其中的某個操作被block了,程序並不會等待,而是找出可執行的操作繼續執行
sync同步:指操作一個接一個地執行,下一個操作必須等上一個操作完成后才能執行
4.2 Asyncio原理
event loop對象維護兩個任務列表,預備狀態和等待狀態,選取預備狀態的一個任務,使其運行,一直到這個任務把控制權交還給event loop為止,當任務把控制權交還給event loop時,如果任務完成,它則將其放到預備狀態的列表,否則,放在等待狀態的列表,然后遍歷等待狀態的列表,查看它們是否完成,而原先在預備狀態列表的任務位置仍舊不變,因為它們還未運行。當所有任務被重新放置在合適的列表后,新一輪的循環又開始了。
4.3 如何使用?
-
導入內置庫 asyncio
-
async 修飾詞聲明異步函數,調用異步函數,便可得到一個協程對象
-
協程的執行:
- 通過await來調用,執行效果和正常執行一樣,會阻塞在這里,進入被調用的協程函數,執行完畢返回后再繼續
- asynio.creat_task(調用異步函數) 創建任務
- asynio.run(main())作為主程序的入口函數,觸發運行
import asyncio
async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time) # 從當前任務切出,事件調讀器開始調度
print('OK {}'.format(url)) # 任務完成后,從事件循環中退出
async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls] # 列表生成式
for task in tasks: # 多個任務被創建,進入事件循環等待運行
await task # 執行,用戶選擇從當前主任務中切出,事件調度器開始調度
# await asyncio.gather(*tasks)
asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
########## 輸出 ##########
crawling url_1
crawling url_2
crawling url_3
crawling url_4
OK url_1
OK url_2
OK url_3
OK url_4
import asyncio
async def worker_1():
await asyncio.sleep(1) # 4.從當前任務切出,事件調讀器開始調度任務2
return 1 # 7.1秒后,事件調讀器將控制權重新傳給任務1,返回1,任務1完成,從事件循環中退出,並把控制器傳給主任務
async def worker_2(): # 協程運行時出現錯誤
await asyncio.sleep(2) #5.從當前任務切出,事件調讀器開始調度任務3,
return 2 / 0 # 8.2秒后,事件調讀器將控制器重新傳給任務2,運行出錯,從事件循環中退出,控制器傳給主任務
async def worker_3():
await asyncio.sleep(3) # 6.從當前任務中切出,事件調讀器暫停調度
return 3 # 9.觸發限定運行規則,任務3被取消,退出事件循環
async def main():
task_1 = asyncio.create_task(worker_1()) # 2.任務123被創建,並進入事件循環等待運行
task_2 = asyncio.create_task(worker_2())
task_3 = asyncio.create_task(worker_3())
await asyncio.sleep(2) # 給任務3限定運行時間,一旦超時就取消
task_3.cancel()
res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True) # 3.執行任務,用戶選擇從當前的主任務中切出,事件調讀器開始調度任務1
print(res) # 10.主任務輸出res,協程任務結束,事件循環結束
asyncio.run(main()) # 1.程序進入main()函數,事件循環開啟
########## 輸出 ##########
[1, ZeroDivisionError('division by zero'), CancelledError()]
4.4. 協程的優點
- 協程是單線程的,但其內部 event loop 的機制,可以讓它並發地運行多個不同的任務,並且比多線程享有更大的自主控制權。
- Asyncio 中的任務,在運行過程中不會被打斷,因此不會出現 race condition 的情況
- 子程序切換不是線程切換,而是由程序自身控制,在哪些地方交出控制權,切換到下一個任務,因此沒有線程切換的開銷,具有極高的執行效率
- 協程的寫法更加清晰簡潔,把async/await 語法和 create_task結合來用,對於中小級別的並發需求已經毫無壓力
- 很多情況下,使用 Asyncio 需要特定第三方庫的支持(缺點),例如不支持requests,但可以用aiohttp
6. 選擇多線程還是協程
- 如果是 I/O bound,並且 I/O 操作很慢,需要很多任務 / 線程協同實現,那么使用 Asyncio 更合適。
- 如果是 I/O bound,但是 I/O 操作很快,只需要有限數量的任務 / 線程,那么使用多線程就可以了。
- 如果是 CPU bound,則需要使用多進程來提高程序運行效率