python中同步、多線程、異步IO、多線程對IO密集型的影響


 

 

 

 1、常見並發類型

I/ O密集型:

藍色框表示程序執行工作的時間,紅色框表示等待I/O操作完成的時間。此圖沒有按比例顯示,因為internet上的請求可能比CPU指令要多花費幾個數量級的時間,所以你的程序可能會花費大部分時間進行等待。

 CPU密集型:

IO密集型程序將時間花在cpu計算上。

常見並發類型以及區別:

 

 

 2、同步版本

 我們將使用requests訪問100個網頁,使用同步的方式,requests的請求是同步的,所以代碼就很好寫了。

同步的版本代碼邏輯簡單,編寫也會很相對容易。

import requests
import time
def download_site(url,session):
    with session.get(url) as response:
        print(len(response.content))

def download_all_site(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url,session)

if __name__ =="__main__":
    sites = ["https://www.baidu.com","https://www.jython.org"] * 50
    start_time = time.time()
    download_all_site(sites)
    end_time = time.time()
    print("執行時間:%s" % (end_time - start_time) + "")
#download_site()只從一個URL下載內容並打印其大小
#需要知道的是我們這里沒有使用requests.get(),而使用了session.get(),我們使用requests.Session()創建了一個Session對象,每次請求使用了session.get(url,因為可以讓requests運用一些神奇的網絡小技巧,從而真正使程序加速。
#執行時間:33.91123294830322秒

 

 

 3、多線程

 ThreadPoolExecutor,: ThreadPoolExecutor =Thread+Pool+ Executor。

你已經了解了Thread部分。那只是我們之前提到的一個思路。Pool部分是開始變得有趣的地方。這個對象將創建一個線程池,其中的每個線程都可以並發運行。最后,Executor是控制線程池中的每個線程如何以及何時運行的部分。它將在線程池中執行請求。

對我們很有幫助的是,標准庫將ThreadPoolExecutor實現為一個上下文管理器,因此你可以使用with語法來管理Threads池的創建和釋放。

一旦有了ThreadPoolExecutor,你就可以使用它方便的.map()方法。此方法在列表中的每個站點上運行傳入函數。最重要的是,它使用自己管理的線程池自動並發地運行它們。

來自其他語言,甚至Python 2的人可能想知道,在處理threading時,管理你習慣的細節的常用對象和函數在哪里,比如Thread.start()、Thread.join()和Queue。

這些都還在那里,你可以使用它們來實現對線程運行方式的精細控制。但是,從Python 3.2開始,標准庫添加了一個更高級別的抽象,稱為Executor,如果你不需要精細控制,它可以為你管理許多細節。

本例中另一個有趣的更改是,每個線程都需要創建自己的request . Session()對象。當你查看requests的文檔時,不一定就能很容易地看出,但在閱讀這個問題(https://github.com/requests/requests/issues/2766  )時,你會清晰地發現每個線程都需要一個單獨的Session。

這是threading中有趣且困難的問題之一。因為操作系統可以控制任務何時中斷,何時啟動另一個任務,所以線程之間共享的任何數據都需要被保護起來,或者說是線程安全的。不幸的是,requests . Session()不是線程安全的。

根據數據是什么以及如何你使用它們,有幾種策略可以使數據訪問變成線程安全的。其中之一是使用線程安全的數據結構,比如來自 Python的queue模塊的Queue。

這些對象使用低級基本數據類型,比如threading.Lock,以確保只有一個線程可以同時訪問代碼塊或內存塊。你可以通過ThreadPoolExecutor對象間接地使用此策略。

import requests
import concurrent.futures
import threading
import time

#創建線程池
thread_local= threading.local()

def get_session():
    if not getattr(thread_local,"session",None):
        thread_local.session = requests.Session()
    return thread_local.session

def download_site(url):
    session = get_session()
    with session.get(url) as response:
        print(len(response.content))

def download_all_site(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as exector:
        exector.map(download_site,sites)

if __name__ =="__main__":
    sites = ["https://www.baidu.com","https://www.jython.org"] * 50
    start_time = time.time()
    download_all_site(sites)
    end_time = time.time()
    print("執行時間:%s" % (end_time - start_time) + "")
#執行時間:6.152076244354248秒

 

這里要使用的另一種策略是線程本地存儲。Threading.local()會創建一個對象,它看起來像一個全局對象但又是特定於每個線程的。在我們的示例中,這是通過threadLocal和get_session()完成的:

ThreadLocal是threading模塊中專門用來解決這個問題的。它看起來有點奇怪,但是你只想創建其中一個對象,而不是為每個線程創建一個對象。對象本身將負責從不同的線程到不同的數據的分開訪問。

當get_session()被調用時,它所查找的session是特定於它所運行的線程的。因此,每個線程都將在第一次調用get_session()時創建一個單個的會話,然后在整個生命周期中對每個后續調用使用該會話。

最后,簡要介紹一下選擇線程的數量。你可以看到示例代碼使用了5個線程。隨意改變這個數字,看看總時間是如何變化的。你可能認為每次下載只有一個線程是最快的,但至少在我的系統上不是這樣。我在5到10個線程之間找到了最快的結果。如果超過這個值,那么創建和銷毀線程的額外開銷就會抵消程序節省的時間。

這里比較困難的答案是,從一個任務到另一個任務的正確線程數不是一個常量。需要進行一些實驗來得到。

 注意:request . Session()不是線程安全的。這意味着,如果多個線程使用同一個Session,那么在某些地方可能會發生上面描述的交互類型問題。

 

多線程代碼的執行時序表:

 

 

 

 4、異步IO

asyncio的一般概念是一個單個的Python對象,稱為事件循環,它控制每個任務如何以及何時運行。事件循環會關注每個任務並知道它處於什么狀態。在實際中,任務可以處於許多狀態,但現在我們假設一個簡化的只有兩種狀態的事件循環。

就緒狀態將表明一個任務有工作要做,並且已經准備好運行,而等待狀態意味着該任務正在等待一些外部工作完成,例如網絡操作。

我們簡化的事件循環維護兩個任務列表,每一個對應這些狀態。它會選擇一個就緒的任務,然后重新啟動它。該任務處於完全控制之中,直到它配合地將控制權交還給事件循環為止。

當正在運行的任務將控制權交還給事件循環時,事件循環將該任務放入就緒或等待列表中,然后遍歷等待列表中的每個任務,以查看I/O操作完成后某個任務是否已經就緒。時間循環知道就緒列表中的任務仍然是就緒的,因為它知道它們還沒有運行。

一旦所有的任務都重新排序到正確的列表中,事件循環將選擇下一個要運行的任務,然后重復這個過程。我們簡化的事件循環會選擇等待時間最長的任務並運行該任務。此過程會一直重復,直到事件循環結束。

asyncio的一個重要之處在於,如果沒有刻意去釋放控制權,任務是永遠不會放棄控制權的。它們在操作過程中從不會被打斷。這使得我們在asyncio中比在threading中能更容易地共享資源。你不必擔心代碼是否是線程安全的。

import time
import asyncio
from aiohttp import ClientSession
async def download_site(session,url):
    global i
    try:
        async with session.get(url) as response:
            i=i+1
            print(i)
            return await response.read()
    except Exception as e:
         pass
async def download_all_site(sites):
    async with ClientSession() as session:
        tasks = []
        for url in sites:
            task = asyncio.create_task(download_site(session,url))
            tasks.append(task)
        result = await asyncio.gather(*tasks) #等待一組協程運行結束並接收結果
        print(result)


if __name__ =="__main__":
    i=0
    sites = ["http://www.360kuai.com/","https://www.jython.org"] * 50
    start_time = time.time()
    asyncio.run(download_all_site(sites))
    end_time = time.time()
    print("執行時間:%s" % (end_time - start_time) + "")

#執行時間:5.29184889793396秒

異步IO的執行時序表:

asyncio版本的問題

此時asyncio有兩個問題。你需要特殊的異步版本的庫來充分利用asycio。如果你只是使用requests下載站點,那么速度會慢得多,因為requests的設計目的不是通知事件循環它被阻塞了。隨着時間的推移,這個問題變得微不足道,因為越來越多的庫包含了asyncio。

另一個更微妙的問題是,如果其中一個任務不合作,那么協作多任務處理的所有優勢都將不存在。代碼中的一個小錯誤可能會導致任務運行超時並長時間占用處理器,使需要運行的其他任務無法運行。如果一個任務沒有將控制權交還給事件循環,則事件循環無法中斷它。

考慮到這一點,我們來開始討論一種完全不同的並發性——multiprocessing。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM