並發的意義
為了高效處理網絡I/O,需要使用並發,因為網絡有很高的延遲,所以為了不浪費CPU周期去等待,最好在收到網絡響應之前做些其他的事。
在I/O密集型應用中,如果代碼寫得正確,那么不管是用哪種並發策略(使用線程或asyncio包),吞吐量都比依序執行的代碼高很多。
並發是指一次處理多件事。並行是指一次做多件事。一個關於結構,一個關於執行。
並行才是我們通常認為的那個同時做多件事情,而並發則是在線程這個模型下產生的概念。
並發表示同時發生了多件事情,通過時間片切換,哪怕只有單一的核心,也可以實現“同時做多件事情”這個效果。
根據底層是否有多處理器,並發與並行是可以等效的,這並不是兩個互斥的概念。
舉個我們開發中會遇到的例子,我們說資源請求並發數達到了1萬。這里的意思是有1萬個請求同時過來了。但是這里很明顯不可能真正的同時去處理這1萬個請求的吧!
如果這台機器的處理器有4個核心,不考慮超線程,那么我們認為同時會有4個線程在跑。
也就是說,並發訪問數是1萬,而底層真實的並行處理的請求數是4。
如果並發數小一些只有4的話,又或者你的機器牛逼有1萬個核心,那並發在這里和並行一個效果。
也就是說,並發可以是虛擬的同時執行,也可以是真的同時執行。而並行的意思是真的同時執行。
結論是:並行是我們物理時空觀下的同時執行,而並發則是操作系統用線程這個模型抽象之后站在線程的視角上看到的“同時”執行。
Future
一、初識future
concurrent.futures 模塊主要特色是:ThreadPoolEXecutor和 ProcessPoolExecutor類,這兩個類實現的接口能分別在不同的線程或進程中執行可調用的對象。
這兩個類在內部維護着一個工作線程或進程池,以及要執行的任務隊列。
from concurrent import futures MAX_WORKERS = 20 def download_many(): workers = min(MAX_WORKERS,len(url_list)) with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_one,sorted(url_list))
return len(list(res))
(1)設定工作的線程數量,使用允許的最大值與要處理的數量之間的較小的那個值,以免創建過於的線程。
(2)download_one函數在多個線程中並發調用,map方法返回一個生成器,因此可以迭代,獲取各個函數返回的值。
future是concurrent.futures模塊和asyncio包的重要組件。
從python3.4開始標准庫中有兩個名為Future的類:concurrent.futures.Future和asyncio.Future
這兩個類的作用相同:兩個Future類的實例都表示可能完成或者尚未完成的延遲計算。與Twisted中的Deferred類、Tornado框架中的Future類的功能類似
future封裝待完成的操作,可以放入隊列,完成的狀態可以查詢,得到結果(或拋出異常)后可以獲取結果(或異常)。
▲ 通常情況下自己不應該創建future,只能由並發框架(concurrent.future或asyncio)實例化。
future表示終將發生的事情,而確定某件事會發生的唯一方式就是執行的時間已經排定。
只有排定把某件事交給concurrent.futures.Executor子類處理時,才會創建concurrent.futures.Future實例。
Executor.submit(fn, *args, **kwargs)
Executor.submit() 方法的參數是一個可調用的對象,調用這個方法后會為傳入的可調用對象排期,返回一個future。
▲ 不是阻塞的,而是立即返回。能夠使用 done()方法判斷該任務是否結束。
使用cancel()方法可以取消提交的任務,如果任務已經在線程池中運行了,就取消不了。
客戶端代碼不應該改變future的狀態,並發框架在future表示的延遲計算結束后會改變future狀態。而我們無法控制計算何時結束。
Executor.shutdown(wait=True)
釋放系統資源,在Executor.submit()或 Executor.map()等異步操作后調用。使用with語句可以避免顯式調用此方法。
shutdown(wait=True) 相當於進程池的 pool.close()+pool.join() 操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續,--------》默認
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
Executor.add_done_callback(fn)
future都有 .add_done_callback(fn) 方法,這個方法只有一個參數,類型是可調用的對象,future運行結束后會調用指定的可調用對象。
fn接收一個future參數,通過obj.result(),獲得執行后結果。
Executor.result()
.result()方法,在future運行結束后調用的話,返回可調用對象的結果,或者重新拋出執行可調用的對象時拋出的異常。
如果沒有運行結束,concurrent會阻塞調用方直到有結果可返回。
concurrent.futures.as_completed()
使用concurrent.futures.as_completed函數,這個函數的參數是一個future列表 / future為key的字典,返回值是一個生成器,
在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務future,就能執行for循環下面的語句,然后繼續阻塞,循環到所有的任務結束。
從結果也可以看出,先完成的任務會先通知主線程。
Executor.map(func, *iterables, timeout=None)
Executor.map() 返回值是一個迭代器,迭代器的__next__方法調用各個future的result()方法,得到各個future的結果而不是future本身。
*iterables:可迭代對象,如列表等。每一次func執行,都會從iterables中取參數。
timeout:設置每次異步操作的超時時間
修改Executor.map調用,換成兩個for循環,一個用於創建並排定future,另一個用於獲取future的結果
def download_many(): with futures.ThreadPoolExecutor(max_workers=3) as executor: to_do = [] for cc in sorted(url_list): future = executor.submit(download_one,cc) to_do.append(future) result = [] for future in futures.as_completed(to_do): res = future.result() result.append(res)
executor.submit() 方法排定可調用對象的執行時間,然后返回一個future,表示這個待執行的操作。
示例中的future.result()方法絕不會阻塞,因為future由as_completed函數產出。
▲ 同時在 future.result()處使用 try模塊捕獲異常
二、阻塞型I/O和GIL
Cpython解釋器本身就不是線程安全的,因此有全局解釋器鎖(GIL),一次只允許使用一個線程執行Python字節碼。因此,一個Python進程通常不能同時使用多個CPU核心。
標准庫中所有執行阻塞型I/O操作的函數,在等待操作系統返回結果時都會釋放GIL。I/O密集型Python程序能從中受益。
一個Python線程等待網絡響應時,阻塞型I/O函數會釋放GIL,再運行一個線程。
三、ProcessPoolExecutor
ProcessPoolExecutor 和 ThreadPoolExecutor類都實現了通用的Executor接口,因此使用concurrent.futures模塊能特別輕松地把基於線程的方案轉成基於進程的方案。
ThreadPoolExecutor.__init__方法需要max_workers參數,指定線程池中線程的數量。(10、100或1000個線程)
ProcessPoolExecutor類中這個參數是可選的,而且大多數情況下不使用,默認值是os.cpu_count()函數返回的CPU數量。四核CPU,因此限制只能有4個並發。而線程池版本可以有上百個。
ProcessPoolExecutor類把工作分配給多個Python進程處理,因此,如果需要做CPU密集型處理,使用這個模塊能繞開GIL,利用所有的CPU核心。
其原理是一個ProcessPoolExecutor創建了N個獨立的Python解釋器,N是系統上面可用的CPU核數。
使用方法和ThreadPoolExecutor方法一樣
from time import sleep,strftime from concurrent import futures def display(*args): print(strftime('[%H:%M:%S]'),end=' ') print(*args) def loiter(n): msg = '{}loiter({}): doing nothing for {}s' display(msg.format('\t'*n,n,n)) sleep(n*2) msg = '{}loiter({}): done.' display(msg.format('\t'*n,n)) return n *10 def main(): display('Script starting...') executor = futures.ThreadPoolExecutor(max_workers=3) results = executor.map(loiter,range(5)) display('result:',results) display('Waiting for individual results:') for i,result in enumerate(results): display('result {}:{}'.format(i,result)) main()
Executor.map函數返回結果的順序與調用時開始的順序一致。
如果第一個調用生成結果用時10秒,而其他調用只用1秒,代碼會阻塞10秒,獲取map方法返回的生成器產出的第一個結果。
在此之后,獲取后續結果不會阻塞,因為后續的調用已經結束。
如果需要不管提交的順序,只要有結果就獲取,使用 Executor.submit() 和 Executor.as_completed() 函數。
四、顯示下載進度條
TQDM包特別易於使用。
from tqdm import tqdm import time for i in tqdm(range(1000)): time.sleep(.01)
tqdm函數能處理任何可迭代的對象,生成一個迭代器。
使用這個迭代器時,顯示進度條和完成全部迭代預計的剩余時間。
為了計算剩余時間,tqdm函數要獲取一個能使用len函數確定大小的可迭代對象,或者在第二個參數中指定預期的元素數量。
如:iterable = tqdm.tqdm(iterable, total=len(xx_list))
Asyncio
一、使用asyncio包處理並發
這個包主要使用事件循環的協程實現並發。
import asyncio import itertools import sys @asyncio.coroutine def spin(msg): write,flush = sys.stdout.write,sys.stdout.flush for char in itertools.cycle('|/-\\'): status = char + ' ' +msg write(status) flush() write('\x08'*len(status)) try: yield from asyncio.sleep(.1) except asyncio.CancelledError: break write(' '*len(status) + '\x08'*len(status)) @asyncio.coroutine def slow_function(): yield from asyncio.sleep(3) return 42 @asyncio.coroutine def supervisor(): spinner = asyncio.async(spin('thinking')) print('spinner object:',spinner) result = yield from slow_function() spinner.cancel() return result def main(): loop = asyncio.get_event_loop() result = loop.run_until_complete(supervisor()) loop.close() print('Answer:',result)
(1)打算交給asyncio處理的協程要使用@asyncio.coroutine裝飾。
(2)使用yield from asyncio.sleep 代替 time.sleep,這樣休眠不會阻塞事件循環。
(3)asyncio.async(...)函數排定spin協程的運行時間,使用一個Task對象包裝spin協程,並立即返回。
(4)獲取事件循環的引用,驅動supervisor協程。
▲ 如果寫成需要在一段時間內什么也不做,應該使用yield from asyncio.sleep(DELAY)
asyncio.Task對象差不多與threading.Thread對象等效,Task對象像是實現協作式多任務的庫(如:gevent)中的綠色線程(green thread)
獲取的Task對象已經排定了運行時間,Thread實例必須調用start方法,明確告知讓他運行。
沒有API能從外部終止線程,因為線程隨時可能被中斷,導致系統處於無效狀態。
如果想要終止任務,使用Task.cancel()實例方法,拋出CancelledError異常。協程可以在暫停的yield處捕獲這個異常,處理終止請求。
二、asyncio.Future 與 concurrent.futures.Future
asyncio.Future 與 concurrent.futures.Future類的接口基本一致,不過實現方式不同,不可以互換。
future只是調度執行某物的結果。
在asyncio包中,BaseEventLoop.create_task(...)方法接收一個協程,排定它的運行時間,然后返回一個asyncio.Task實例,也是asyncio.Future類的實例,因為Task是Future的子類,用於包裝協程。
asyncio.Future類的目的是與yield from一起使用,所以通常不需要使用以下方法。
(1)無需調用my_future.add_done_callback(...),因為可以直接把想在future運行結束后執行的操作放在協程中yield from my_future表達式的后面,
(2)無需調用my_future.result(),因為yield from從future中產出的值就是結果(result = yield from my_future)。
asyncio.Future對象由yield from驅動,而不是靠調用這些方法驅動。
獲取Task對象有兩種方式:
(1)asyncio.async(coro_or_future, *, loop=None),
第一個參數如果是Future或者Task對象,返回。如果是協程,那么async函數會調用loop.create_task(...)方法創建Task對象。
(2)BaseEventLoop.create_task(coro),
排定協程的執行時間,返回一個asyncio.Task對象。
三、asyncio和aiohttp
asyncio包只直接支持TCP和UDP。如果想使用HTTP或其他協議,那么要借助第三方包。
import asyncio import aiohttp @asyncio.coroutine def get_flag(url): resp = yield from aiohttp.request('GET',url) data = yield from resp.read() return data @asyncio.coroutine def download_one(url): data = yield from get_flag(url) return url def download_many(): loop = asyncio.get_event_loop() to_do = [download_one(url) for url in sorted(url_list)] wait_coro = asyncio.wait(to_do) res,_ = loop.run_until_complete(wait_coro) loop.close() return len(res)
阻塞的操作通過協程實現,客戶代碼通過yield from把職責委托給協程,以便異步運行協程。
構建協程對象列表。
asyncio.wait是一個協程,等傳給它的所有協程運行完畢后結束。wait函數默認行為。
loop.run_until_complete(wait_coro)執行事件循環。直到wait_coro運行結束;時間循環運行的過程中,這個腳本會在這里阻塞。
asyncio.wait函數運行結束后返回一個元組,第一個元素是一系列結束的future,第二個元素是一系列未結束的future。
(如果設置了timeout和return_when 就會返回未結束的future)
▲ 為了使用asyncio包,必須把每個訪問網絡的函數改成異步版,使用yield from處理網絡操作,這樣才能把控制權交還給事件循環。
總結:
(1)我們編寫的協程鏈條始終是通過把最外層委派生成器傳給asyncio包API中的某個函數(如loop.run_until_complete(...))驅動。
由asyncio包實現next(...)或.send(...)
(2)我們編寫的協程鏈條始終通過yield from把職責委托給asyncio包中的某個協程函數或協程方法(yield from asyncio.sleep(...)),或者其他庫中實現高層協議的協程(yield from aiohttp.request(...)),
也就是說最內層的子生成器是庫中真正執行I/O操作的函數,而不是我們自己編寫的函數。
四、asyncio與進度條結合
由loop.run_until_complete方法驅動,全部協程運行完畢后,這個函數會返回所有下載結果。
可是,為了更新進度條,各個協程運行結束后就要立即獲取結果。
import asyncio import aiohttp from tqdm import tqdm import collections @asyncio.coroutine def get_flag(url): resp = yield from aiohttp.request('GET',url) data = yield from resp.read() return data @asyncio.coroutine def download_one(url,semaphore): try: with (yield from semaphore): data = yield from get_flag(url) except Exception as exc: '''''' else: save_data(data) return url @asyncio.coroutine def download_coro(url_list,concur_req): counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) to_do = [download_one(url,semaphore) for url in url_list] to_do_iter = asyncio.as_completed(to_do) to_do_iter = tqdm(to_do_iter,total=len(url_list)) for future in to_do_iter: try: res = yield from future except Exception as exc: '''''' counter[status] += 1 return counter def download_many(): loop = asyncio.get_event_loop() coro = download_coro(url_list,concur_req) res = loop.run_until_complete(coro) loop.close() return res
(1)使用某種限流機制,防止向服務器發起太多並發請求,使用ThreadPoolExecutor類時可以通過設置線程池數量;
(2)asyncio.Semaphore對象維護這一個內部計數器,把semaphore當做上下文管理器使用。保證任何時候都不會有超過X個協程啟動。
(3)asyncio.as_completed(xxx),獲取一個迭代器,這個迭代器會在future運行結束后返回future。
(4)迭代運行結束的future,獲取asyncio.Future對象的結果,使用yield from,而不是future.result()方法。
(5)不能使用字典映射方式,因為asyncio.as_completed函數返回的future與傳給as_completed函數的future可能不同。在asyncio包內部,我們提供的future會被替換成生成相同結果的future。
五、使用Executor對象,防止阻塞事件循環
上述示例中,save_data(...),會執行硬盤I/O操作,而這應該異步執行。
在線程版本中,save_data(...)會阻塞download_one函數的線程,但是阻塞的只是眾多工作線程中的一個。
阻塞型I/O調用在背后會釋放GIL,因此另一個線程可以繼續。
但是在asyncio中,save_data(...)函數阻塞了客戶代碼與asyncio事件循環共用的唯一線程,因此保存文件時,整個應用程序都會凍結。
asyncio的事件循環在背后維護者一個ThreadPoolExecutor對象,我們可以調用run_in_executor方法,把可調用對象發給它執行。
@asyncio.coroutine def download_one(url,semaphore): try: with (yield from semaphore): data = yield from get_flag(url) except Exception as exc: '''''' else: loop = asyncio.get_event_loop() loop.run_in_executor(None, save_data, data) return url
(1)獲取事件循環對象的引用。
(2)run_in_executor方法的第一個參數是Executor實例;如果設為None,使用事件循環的默認ThreadPoolExecutor實例。
(3)余下參數是可調用的對象,以及可調用對象的位置參數。
每次下載發起多次請求:

@asyncio.coroutine def get_flag(url): resp = yield from aiohttp.request('GET',url) data = yield from resp.read() json = yield from resp.json() return data @asyncio.coroutine def download_one(url,semaphore): try: with (yield from semaphore): flag = yield from get_flag(url) with (yield from semaphore): country = yield from get_country(url) except Exception as exc: '''''' return url
六、使用asyncio包編寫服務器