Python進階:並發編程之Futures


區分並發和並行

  並發(Concurrency).

  由於Python 的解釋器並不是線程安全的,為了解決由此帶來的 race condition 等問題,Python 便引入了全局解釋器鎖,也就是同一時刻,只允許一個線程執行。當然,在執行 I/O 操作時,如果一個線程被 block 了,全局解釋器鎖便會被釋放,從而讓另一個線程能夠繼續執行。所以在Python中,並發並不是指同一時刻有多個操作(thread、task)同時進行,而是同一時刻,只允許有一個線程或任務執行。
  

  並行(Parallelism)

  指多個進程完全同步同時的執行。
  
 

並發編程之 Futures

  單線程與多線程性能比較

  假設我們有一個任務,是下載一些網站的內容並打印。如果用單線程的方式,它的代碼實現如下所示
import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))
    
def download_all(sites):
    for site in sites:
        download_one(site)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
    
if __name__ == '__main__':
    main()

# 輸出
Read 129196 from https://en.wikipedia.org/wiki/Portal:Arts
Read 183867 from https://en.wikipedia.org/wiki/Portal:History
Read 224161 from https://en.wikipedia.org/wiki/Portal:Society
Read 114387 from https://en.wikipedia.org/wiki/Portal:Biography
Read 152871 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 156339 from https://en.wikipedia.org/wiki/Portal:Technology
Read 162872 from https://en.wikipedia.org/wiki/Portal:Geography
Read 91504 from https://en.wikipedia.org/wiki/Portal:Science
Read 323262 from https://en.wikipedia.org/wiki/Computer_science
Read 391073 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 319710 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 470754 from https://en.wikipedia.org/wiki/PHP
Read 180774 from https://en.wikipedia.org/wiki/Node.js
Read 56799 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 325451 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 67.349395015 seconds
  以上代碼的流程:先是遍歷存儲網站的列表; 然后對當前網站執行下載操作;等到當前操作完成后,再對下一個網站進行同樣的操作,一直到結束。
  接下來看多線程版本
import concurrent.futures
import requests
import threading
import time

def download_one(url):
    try:
        resp = requests.get(url)
        print('Read {} from {}'.format(len(resp.content), url))
    except Exception as ex:
        print(ex)

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        results = executor.map(download_one, sites)
    # with concurrent.futures.ProcessPoolExecutor() as executor:
    #     results = executor.map(download_one,sites)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

# 輸出
Read 114387 from https://en.wikipedia.org/wiki/Portal:Biography
Read 129196 from https://en.wikipedia.org/wiki/Portal:Arts
Read 183867 from https://en.wikipedia.org/wiki/Portal:History
Read 152871 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 224161 from https://en.wikipedia.org/wiki/Portal:Society
Read 156339 from https://en.wikipedia.org/wiki/Portal:Technology
Read 91504 from https://en.wikipedia.org/wiki/Portal:Science
Read 391073 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 162872 from https://en.wikipedia.org/wiki/Portal:Geography
Read 323262 from https://en.wikipedia.org/wiki/Computer_science
Read 56799 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 319710 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 325451 from https://en.wikipedia.org/wiki/Go_(programming_language)
Read 180774 from https://en.wikipedia.org/wiki/Node.js
Read 470754 from https://en.wikipedia.org/wiki/PHP
Download 15 sites in 10.022916933 seconds
  以上代碼效率提高了6倍。使用ThreadPoolExecutor創建了一個線程池,max_workers分配了5個線程,executor.map(download_one, sites)對sites的元素並發的調用download_one函數。其中requests.get()方法是線程安全的(thread-safe),在多線程環境中可以安全地使用。線程的數量雖可以自定,但過多的線程會造成系統的開銷增大。可以根據實際需求做測試,尋找最優線程數量。
  以上代碼也可以用並行的方法來實現。在download_all()函數中:
with futures.ThreadPoolExecutor(workers) as executor
=>
with futures.ProcessPoolExecutor() as executor: 

  對於這種IO場景,用並行的方式並不會比並發的方式效率高.

到底什么是 Futures ?

   Python 中的 Futures 模塊,位於 concurrent.futures 和 asyncio 中,它們都表示帶有延遲的操作。Futures 會將處於等待狀態的操作包裹起來放到隊列中,這些操作的狀態隨時可以查詢,當然,它們的結果或是異常,也能夠在操作完成后被獲取。
import concurrent.futures
import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))
    return f'download {len(resp.content)} ok'

# def over(arg):
#     print(arg)
#     print('over')

def download_all(sites):
    #future列表中每個future完成的順序,和它在列表中的順序並不一定完全一致。
    #到底哪個先完成、哪個后完成,取決於系統的調度和每個future的執行時間
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        to_do = []
        for site in sites:
            #executor.submit返回future實例
            future = executor.submit(download_one, site)
            to_do.append(future)
            #future.add_done_callback(over)
        
        #在futures完成后打印結果
        for future in concurrent.futures.as_completed(to_do):
            if future.exception() is not None:
                print(future.exception())
            else:
                print(future.result())

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

# 輸出
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 107634 from https://en.wikipedia.org/wiki/Portal:Biography
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 158984 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 157949 from https://en.wikipedia.org/wiki/Portal:Technology
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 94228 from https://en.wikipedia.org/wiki/Portal:Science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 468421 from https://en.wikipedia.org/wiki/PHP
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 0.21698231499976828 seconds

 

  future列表中每個future完成的順序,和它在列表中的順序並不一定完全一致。到底哪個先完成、哪個后完成,取決於系統的調度和每個future的執行時間。

  並發通常用於 I/O 操作頻繁的場景,而並行則適用於 CPU heavy 的場景。

 參考

   極客時間《Python核心技術與實戰》專欄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM