1、常見並發類型
I/ O密集型:
藍色框表示程序執行工作的時間,紅色框表示等待I/O操作完成的時間。此圖沒有按比例顯示,因為internet上的請求可能比CPU指令要多花費幾個數量級的時間,所以你的程序可能會花費大部分時間進行等待。
CPU密集型:
IO密集型程序將時間花在cpu計算上。
常見並發類型以及區別:
2、同步版本
我們將使用requests訪問100個網頁,使用同步的方式,requests的請求是同步的,所以代碼就很好寫了。
同步的版本代碼邏輯簡單,編寫也會很相對容易。
import requests import time def download_site(url,session): with session.get(url) as response: print(len(response.content)) def download_all_site(sites): with requests.Session() as session: for url in sites: download_site(url,session) if __name__ =="__main__": sites = ["https://www.baidu.com","https://www.jython.org"] * 50 start_time = time.time() download_all_site(sites) end_time = time.time() print("執行時間:%s" % (end_time - start_time) + "秒") #download_site()只從一個URL下載內容並打印其大小 #需要知道的是我們這里沒有使用requests.get(),而使用了session.get(),我們使用requests.Session()創建了一個Session對象,每次請求使用了session.get(url,因為可以讓requests運用一些神奇的網絡小技巧,從而真正使程序加速。 #執行時間:33.91123294830322秒
3、多線程
ThreadPoolExecutor,: ThreadPoolExecutor =Thread+Pool+ Executor。
你已經了解了Thread部分。那只是我們之前提到的一個思路。Pool部分是開始變得有趣的地方。這個對象將創建一個線程池,其中的每個線程都可以並發運行。最后,Executor是控制線程池中的每個線程如何以及何時運行的部分。它將在線程池中執行請求。
對我們很有幫助的是,標准庫將ThreadPoolExecutor實現為一個上下文管理器,因此你可以使用with語法來管理Threads池的創建和釋放。
一旦有了ThreadPoolExecutor,你就可以使用它方便的.map()方法。此方法在列表中的每個站點上運行傳入函數。最重要的是,它使用自己管理的線程池自動並發地運行它們。
來自其他語言,甚至Python 2的人可能想知道,在處理threading時,管理你習慣的細節的常用對象和函數在哪里,比如Thread.start()、Thread.join()和Queue。
這些都還在那里,你可以使用它們來實現對線程運行方式的精細控制。但是,從Python 3.2開始,標准庫添加了一個更高級別的抽象,稱為Executor,如果你不需要精細控制,它可以為你管理許多細節。
本例中另一個有趣的更改是,每個線程都需要創建自己的request . Session()對象。當你查看requests的文檔時,不一定就能很容易地看出,但在閱讀這個問題(https://github.com/requests/requests/issues/2766 )時,你會清晰地發現每個線程都需要一個單獨的Session。
這是threading中有趣且困難的問題之一。因為操作系統可以控制任務何時中斷,何時啟動另一個任務,所以線程之間共享的任何數據都需要被保護起來,或者說是線程安全的。不幸的是,requests . Session()不是線程安全的。
根據數據是什么以及如何你使用它們,有幾種策略可以使數據訪問變成線程安全的。其中之一是使用線程安全的數據結構,比如來自 Python的queue模塊的Queue。
這些對象使用低級基本數據類型,比如threading.Lock,以確保只有一個線程可以同時訪問代碼塊或內存塊。你可以通過ThreadPoolExecutor對象間接地使用此策略。
import requests import concurrent.futures import threading import time #創建線程池 thread_local= threading.local() def get_session(): if not getattr(thread_local,"session",None): thread_local.session = requests.Session() return thread_local.session def download_site(url): session = get_session() with session.get(url) as response: print(len(response.content)) def download_all_site(sites): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as exector: exector.map(download_site,sites) if __name__ =="__main__": sites = ["https://www.baidu.com","https://www.jython.org"] * 50 start_time = time.time() download_all_site(sites) end_time = time.time() print("執行時間:%s" % (end_time - start_time) + "秒") #執行時間:6.152076244354248秒
這里要使用的另一種策略是線程本地存儲。Threading.local()會創建一個對象,它看起來像一個全局對象但又是特定於每個線程的。在我們的示例中,這是通過threadLocal和get_session()完成的:
ThreadLocal是threading模塊中專門用來解決這個問題的。它看起來有點奇怪,但是你只想創建其中一個對象,而不是為每個線程創建一個對象。對象本身將負責從不同的線程到不同的數據的分開訪問。
當get_session()被調用時,它所查找的session是特定於它所運行的線程的。因此,每個線程都將在第一次調用get_session()時創建一個單個的會話,然后在整個生命周期中對每個后續調用使用該會話。
最后,簡要介紹一下選擇線程的數量。你可以看到示例代碼使用了5個線程。隨意改變這個數字,看看總時間是如何變化的。你可能認為每次下載只有一個線程是最快的,但至少在我的系統上不是這樣。我在5到10個線程之間找到了最快的結果。如果超過這個值,那么創建和銷毀線程的額外開銷就會抵消程序節省的時間。
這里比較困難的答案是,從一個任務到另一個任務的正確線程數不是一個常量。需要進行一些實驗來得到。
注意:request . Session()不是線程安全的。這意味着,如果多個線程使用同一個Session,那么在某些地方可能會發生上面描述的交互類型問題。
多線程代碼的執行時序表:
4、異步IO
asyncio的一般概念是一個單個的Python對象,稱為事件循環,它控制每個任務如何以及何時運行。事件循環會關注每個任務並知道它處於什么狀態。在實際中,任務可以處於許多狀態,但現在我們假設一個簡化的只有兩種狀態的事件循環。
就緒狀態將表明一個任務有工作要做,並且已經准備好運行,而等待狀態意味着該任務正在等待一些外部工作完成,例如網絡操作。
我們簡化的事件循環維護兩個任務列表,每一個對應這些狀態。它會選擇一個就緒的任務,然后重新啟動它。該任務處於完全控制之中,直到它配合地將控制權交還給事件循環為止。
當正在運行的任務將控制權交還給事件循環時,事件循環將該任務放入就緒或等待列表中,然后遍歷等待列表中的每個任務,以查看I/O操作完成后某個任務是否已經就緒。時間循環知道就緒列表中的任務仍然是就緒的,因為它知道它們還沒有運行。
一旦所有的任務都重新排序到正確的列表中,事件循環將選擇下一個要運行的任務,然后重復這個過程。我們簡化的事件循環會選擇等待時間最長的任務並運行該任務。此過程會一直重復,直到事件循環結束。
asyncio的一個重要之處在於,如果沒有刻意去釋放控制權,任務是永遠不會放棄控制權的。它們在操作過程中從不會被打斷。這使得我們在asyncio中比在threading中能更容易地共享資源。你不必擔心代碼是否是線程安全的。
import time import asyncio from aiohttp import ClientSession async def download_site(session,url): global i try: async with session.get(url) as response: i=i+1 print(i) return await response.read() except Exception as e: pass async def download_all_site(sites): async with ClientSession() as session: tasks = [] for url in sites: task = asyncio.create_task(download_site(session,url)) tasks.append(task) result = await asyncio.gather(*tasks) #等待一組協程運行結束並接收結果 print(result) if __name__ =="__main__": i=0 sites = ["http://www.360kuai.com/","https://www.jython.org"] * 50 start_time = time.time() asyncio.run(download_all_site(sites)) end_time = time.time() print("執行時間:%s" % (end_time - start_time) + "秒")
#執行時間:5.29184889793396秒
異步IO的執行時序表:
asyncio版本的問題
此時asyncio有兩個問題。你需要特殊的異步版本的庫來充分利用asycio。如果你只是使用requests下載站點,那么速度會慢得多,因為requests的設計目的不是通知事件循環它被阻塞了。隨着時間的推移,這個問題變得微不足道,因為越來越多的庫包含了asyncio。
另一個更微妙的問題是,如果其中一個任務不合作,那么協作多任務處理的所有優勢都將不存在。代碼中的一個小錯誤可能會導致任務運行超時並長時間占用處理器,使需要運行的其他任務無法運行。如果一個任務沒有將控制權交還給事件循環,則事件循環無法中斷它。
考慮到這一點,我們來開始討論一種完全不同的並發性——multiprocessing。