Python並行編程(十四):異步編程


1、基本概念

  除了順序執行和並行執行的模型以外,還有異步模型,這是事件驅動模型的基礎。異步活動的執行模型可以只有一個單一的主控制流,能在單核心系統和多核心系統中運行。

  在並發執行的異步模型中,許多任務被穿插在同一時間線上,所有的任務都由一個控制流執行(單一線程)。任務的執行可能被暫停或恢復,中間的這段時間線程將會執行其他任務。大致如下:

  

  如上圖所示,任務(不同的顏色表示不同的任務)可能被其他任務插入,但是都處在同一個線程下。這表明當某一個任務執行的時候,其他任務都暫停了。與多線程編程模型很大的一點不同是,多線程的某個任務在時間線上什么時候掛起某個活動或恢復某個活動由系統決定,而在異步中,程序員必須假設線程可能在任何時間被掛起和替換。

  程序員可以將任務編寫成許多可以間隔執行的小步驟,如果一個任務需要另一個任務的輸出,那么被依賴的任務必須接收它的輸入。

2、使用Python的concurrent.futures模塊

  這個模塊具有線程池和進程池、管理並行編程任務、處理非確定性的執行流程、進程/線程同步等功能。

  此模塊由一下部分組成:

  - concurrent.futures.Executor:這是一個虛擬基類,提供了異步執行的方法。

  - submit(function, argument):調度函數(可調用的對象)的執行,將argument作為參數傳入。

  - map(function, argument):將argument作為參數執行函數,以異步的方式。

  - shutdown(Wait=True):發出讓執行者釋放所有資源的信號。

  - concurrent.futures.Future:其中包括函數的異步執行。Future對象是submit任務(即帶有參數的functions)到executor的實例。

  Executor是抽象類,可以通過子類訪問,即線程或進程的ExecutorPools。因為線程或進程的實例是依賴於資源的任務,所以最好以池的形式將他們組織在一起,作為可以重用的launcher和executor。

  線程池和進程池是用於在程序中優化和簡化線程/進程的使用。通過池可以提交任務給executor。池由兩部分組成,一部分是內部的隊列,存放着待執行的任務;另一部分是一系列的進程或線程,用於執行這些任務。池的概念主要目的是為了重用:讓線程或進程在生命周期內可以多次使用。他減少了創建線程和進程的開銷,提高了程序性能。重用不是必須的規則,但它是程序員在應用中使用池的主要原因。

  

  current.Futures提供了兩種Executor的子類,各自獨立操作一個線程池和一個進程池。這兩個子類分別是:

  - concurrent.futures.ThreadPoolExecutor(max_workers)

  - concurrent.futures.ProcessPoolExecutor(max_workers)

  max_workers參數表示最多有多少個worker並行執行任務

  代碼測試:

import concurrent.futures
import time

number_list = [1,2,3,4,5,6,7,8,9,10]

def evaluate_item(x):
    #For time consuming
    result_item = count(x)
    return result_item

def count(number):
    for i in range(0, 10000000):
        i = i + 1
    return i * number

if __name__ == "__main__":
    # Sequential execution
    start_time = time.time()
    for item in number_list:
        print(evaluate_item(item))
    print("Sequential execution in %s seconds" %(str(time.time() - start_time)))
    # Thread pool execution
    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(evaluate_item, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Thread pool execution in %s seconds"  %(str(time.time() - start_time_1)))

    # Process pool execution
    start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(evaluate_item, item) for item in number_list]
    print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))

  運行結果:

10000000
20000000
30000000
40000000
50000000
60000000
70000000
80000000
90000000
100000000
Sequential execution in 8.975373029708862 seconds
10000000
20000000
30000000
40000000
60000000
70000000
50000000
80000000
90000000
100000000
Thread pool execution in 8.699156045913696 seconds
Process pool execution in 5.916198968887329 seconds

  創建一個list存放10個數字,然后使用一個循環計算從1加到10000000,打印出和與number_list的乘積。

number_list = [1,2,3,4,5,6,7,8,9,10]

def evaluate_item(x):
    #For time consuming
    result_item = count(x)
    return result_item

def count(number):
    for i in range(0, 10000000):
        i = i + 1
    return i * number

  在主程序中,首先順序執行了一次程序並打印其執行時間:

start_time = time.time()
    for item in number_list:
        print(evaluate_item(item))
    print("Sequential execution in %s seconds" %(str(time.time() - start_time)))

  其次使用futures.ThreadPoolExecutor模塊的線程池並打印其時間:

start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(evaluate_item, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Thread pool execution in %s seconds"  %(str(time.time() - start_time_1)))

  ThreadPoolExecutor使用線程池中的一個線程執行給定任務。池中一共有5個線程,每一個線程從池中取得一個任務然后執行它,當任務執行完成,再從池中拿到另一個任務。

  最后是使用進程池:

start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(evaluate_item, item) for item in number_list]
    print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))

  和ThreadPoolExecutor一樣,ProcessPoolExecutor是一個executor,使用一個線程池來並行執行任務。因為ProcessPoolExecutor使用了多核處理的模塊,讓我們可以不受GIL的限制,大大縮短執行時間。

  幾乎所有需要處理多個客戶端請求的服務應用都會使用池。也有應用要求需要立即執行,或者要求對任務的線程有更多的控制器,這種情況下,池不是一個最佳選擇。

3、使用Asyncio管理事件循環

  先入為主:

import asyncio
import datetime
import time

def function_1(end_time, loop):
    print("function_1 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_2, end_time, loop)
    else:
        loop.stop()

def function_2(end_time, loop):
    print("function_2 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_3, end_time, loop)
    else:
        loop.stop()

def function_3(end_time, loop):
    print("function_3 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_1, end_time, loop)
    else:
        loop.stop()

def function_4(end_time, loop):
    print("function_4 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_4, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()
print(loop.time())
end_loop = loop.time() + 9.0
print(end_loop)
loop.call_soon(function_1, end_loop, loop)
#loop.call_soon(function_4, end_loop, loop)
loop.run_forever()
loop.close()

  執行結果:

  

  上述例子定義了三個異步任務,相繼執行,如圖所示:

  

  首先,我們要得到這個事件循環:

loop = asyncio.get_event_loop()

  然后我們通過call_soon方法調用了function_1()函數。

end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)

  function_1:

def function_1(end_time, loop):
    print("function_1 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_2, end_time, loop)
    else:
        loop.stop()

  - end_time定義了function_1可以運行的最長時間,並通過call_later方法傳入到function_2中作為參數

  - loop通過get_event_loop()方法得到的事件循環

  任務執行結束之后,它會比較loop.time() + 1s和設定的運行時間,如果沒有超過,使用call_later在1秒之后執行function_2(),function_2和3作用類似

  如果運行時間超過了設定,時間循環終止。

  概念解釋:

  Python的Asyncio模塊提供了管理事件、協程、任務和線程方法,以及編寫並發代碼的原語。主要組件和概念包括:

  - 事件循環:在Asyncio模塊中,每一個進程都有一個事件循環。

  - 協程:這是子程序的泛化概念。協程可以在執行期間暫停,這樣就可以等待外部的處理(例如IO)完成之后,從之前暫停的地方恢復執行。

  - Futures:定義了Future對象,和concurrent.futures模塊一樣,表示尚未完成的計算。

  - Tasks:這是Asyncio的子類,用於封裝和管理並行模式下的協程。

  事件循環:

    在計算機系統中,可以產生事件的實體叫做事件源,能處理事件的實體叫做事件處理者,還有一些第三方實體叫做事件循環。它的作用是管理所有的事件,在整個程序運行過程中不斷循環執行,追蹤事件發生的順序將他們放到隊列中,當主線程空閑的時候,調用相應的事件處理者處理事件。

   Asyncio管理事件循環的方法:

  - loop = get_event_loop():得到當前上下文的事件循環。

  - loop.call_later(time_delay, callback, argument):延后time_delay秒再執行callback方法。

  - loop.call_soon(callback, argument):盡可能快的調用callback。call_soon()函數結束,主線程回到事件循環之后就會馬上調用callback。

  - loop.time():以float類型返回當前時間循環的內部時間。

  - asyncio.set_event_loop():為當前上下文設置時間循環。

  - asyncio.new_event_loop():根據此策略創建一個新的時間循環並返回。

  - loop.run_forever():在調用stop()之前將一直運行。run_forever真正開始執行函數。

 

4、使用Asyncio管理協程

  上述例子中一個程序變得很大而且復雜時,將其划分為子程序,每一部分實現特定的任務。子程序不能單獨執行,只能在主程序的請求下執行,主程序負責協調使用各個子程序。協程是子程序的泛化,和子程序一樣的是,協程只負責計算任務的一步;不同的是協程沒有主程序來進行調度。因為協程通過管道連接在一起,沒有監視函數負責順序調用他們。在協程中,執行點可以被掛起,可以被之前掛起的點恢復執行。通過協程池就可以插入到計算中:運行第一個任務,直到它返回yield執行權,然后運行下一個,這樣順着執行下去。

  這種插入的控制組件就是前文提到的事件循環,它持續追蹤所有的協程並執行它們。

  協程的另外一些重要特性如下:

  - 協程可以有多個入口點,並可以yield多次

  - 協程可以將執行權交給其他協程

  yield表示協程在此暫停,並且將執行權交給其他協程,因為協程可以將值與控制權一起傳遞給另一個協程,所以yield一個值就表示將值傳給下一個執行的協程。

  測試用例:

import asyncio
import time
from random import randint

@asyncio.coroutine
def StartState():
    print("Start State called \n")
    input_value = randint(0,1)
    time.sleep(1)
    print("I am StartState.input_value is %s" %input_value)
    if (input_value == 0):
        result = yield from State2(input_value)
    else:
        result = yield from State1(input_value)
    print("Resume of the Transition : \nStart State calling %s" %result)

@asyncio.coroutine
def State1(transition_value):
    outputValue = str("State 1 with transition value = %s \n" %transition_value)
    input_value = randint(0,1)
    time.sleep(1)
    print("...Evaluation...")
    print("I am State1.input_value is %s" %input_value)
    if input_value == 0:
        result = yield from State3(input_value)
    else:
        result = yield from State2(input_value)
    result = "State 1 calling %s" %result
    return outputValue + str(result)

@asyncio.coroutine
def State2(transition_value):
    outputValue = str("State 2 with transition value = %s \n" %transition_value)
    input_value = randint(0,1)
    time.sleep(1)
    print("...Evaluation...")
    print("I am State2.input_value is %s" %input_value)
    if input_value == 0:
        result = yield from State1(input_value)
    else:
        result = yield from State3(input_value)
    result = "State 2 calling %s" %result
    return outputValue + str(result)

@asyncio.coroutine
def State3(transition_value):
    outputValue = str("State 3 with transition value = %s \n" %transition_value)
    input_value = randint(0,1)
    time.sleep(1)
    print("...Evaluation...")
    print("I am State3.input_value is %s" %input_value)
    if input_value == 0:
        result = yield from State1(input_value)
    else:
        result = yield from EndState(input_value)
    result = "State 1 calling %s" %result
    return outputValue + str(result)

@asyncio.coroutine
def EndState(transition_value):
    outputValue = str("End State with transition value = %s \n" %transition_value)
    print("I am EndState.outputValue is %s" %outputValue)
    print("...Stop Computation...")

    return outputValue

if __name__ == "__main__":
    print("Finite State Machine simulation with Asyncio Coroutine")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(StartState())

  上述代碼為使用Asyncio的協程來模擬有限狀態機(一個數學模型,不僅在工程領域應用廣泛,在科學領域也很著名)。模擬的狀態機如下:

  

  系統有四個狀態,0和1是狀態機可以從一個狀態到另一個狀態的值,這個過程叫轉換。

  運行結果(結果不唯一):

  

  每一個狀態都由一個裝飾器裝飾:@asyncio.coroutine

  通過yield from命令調用下一個協程。

  啟動事件循環:

if __name__ == "__main__":
    print("Finite State Machine simulation with Asyncio Coroutine")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(StartState())

 

5、使用Asyncio控制任務

  Asyncio是用來處理事件循環中的異步進程和並發任務執行的。它還提供了asyncio.Task()類,可以在任務中使用協程。它的作用是在同一事件循環中,運行某一個任務的同時可以並發地運行多個任務。當協程被包在任務中,它會自動將任務和事件循環連接起來,當事件循環啟動的時候,任務自動運行。這樣就提供了一個可以自動驅動協程的機制。

  Asyncio模塊為我們提供了asyncio.Task(coroutine)方法來處理計算任務,它可以調度協程的執行。任務對協程對象在事件循環的執行負責。如果被包裹的協程要從future yield,那么任務會被掛起,等待future的計算結果。

  當future計算完成,被包裹的協程將會拿到future返回的結果或異常(exception)繼續執行。另外,需要注意的是事件循環一次只能運行一個任務,除非還有其它事件循環在不同的線程並行運行,此任務才有可能和其他任務並行。當一個任務在等待future執行的期間,事件循環會運行一個新的任務。

  測試用例:

import asyncio

@asyncio.coroutine
def factorial(number):
    f = 1
    for i in range(2, number + 1):
        print("Asyncio.Task: Compute factorial(%s)" %i)
        yield from asyncio.sleep(0.5)
        f *= i
    print("Asyncio.Task - factorial(%s) = %s" %(number, f))

@asyncio.coroutine
def fibonacci(number):
    a,b = 0,1
    for i in range(number):
        print("Asyncio.Task: Compute fibonacci(%s)" %i)
        yield from asyncio.sleep(0.5)
        a, b = b, a+b
    print("Asyncio.Task - fibonacci(%s) = %s" %(number, a))

@asyncio.coroutine
def binomialCoeff(n, k):
    result = 1
    for i in range(1, k+1):
        result = result * (n-i+1)/i
        print("Asyncio.Task:Compute binomialCoeff(%s)" %i)
        yield from asyncio.sleep(0.5)
    print("Asyncio.Task - binomialCoeff(%s, %s) = %s" %(n, k, result))

if __name__ == "__main__":
    tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

  執行結果:

Asyncio.Task: Compute factorial(2)
Asyncio.Task: Compute fibonacci(0)
Asyncio.Task:Compute binomialCoeff(1)
Asyncio.Task: Compute factorial(3)
Asyncio.Task: Compute fibonacci(1)
Asyncio.Task:Compute binomialCoeff(2)
Asyncio.Task: Compute factorial(4)
Asyncio.Task: Compute fibonacci(2)
Asyncio.Task:Compute binomialCoeff(3)
Asyncio.Task: Compute factorial(5)
Asyncio.Task: Compute fibonacci(3)
Asyncio.Task:Compute binomialCoeff(4)
Asyncio.Task: Compute factorial(6)
Asyncio.Task: Compute fibonacci(4)
Asyncio.Task:Compute binomialCoeff(5)
Asyncio.Task: Compute factorial(7)
Asyncio.Task: Compute fibonacci(5)
Asyncio.Task:Compute binomialCoeff(6)
Asyncio.Task: Compute factorial(8)
Asyncio.Task: Compute fibonacci(6)
Asyncio.Task:Compute binomialCoeff(7)
Asyncio.Task: Compute factorial(9)
Asyncio.Task: Compute fibonacci(7)
Asyncio.Task:Compute binomialCoeff(8)
Asyncio.Task: Compute factorial(10)
Asyncio.Task: Compute fibonacci(8)
Asyncio.Task:Compute binomialCoeff(9)
Asyncio.Task - factorial(10) = 3628800
Asyncio.Task: Compute fibonacci(9)
Asyncio.Task:Compute binomialCoeff(10)
Asyncio.Task - fibonacci(10) = 55
Asyncio.Task - binomialCoeff(20, 10) = 184756.0

  上述例子定義了三個線程,factorial,fibonacci,binomialCoeff,每一個都帶有asyncio.coroutine裝飾器:

  將三個task放入到一個list中:

tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]

  通過run_until_complete並行運行三個協程,asyncio.wait(tasks)表示運行直到所有給定的協程都完成。

  最后關閉事件循環:loop.close()

 

6、使用Asyncio和Futures

  Asyncio模塊的另一個重要的組件是Futures。它和concurrent.futures.Futures很像,但是針對Asyncio的事件循環做了很多定制。asyncio.Futures類代表還未完成的結果,有可能是一個Exception,所以綜合來說,它是一種抽象的代表還沒有做完的事情。

  實際上,必須處理一些結果的回調函數被加入到了這個類的實例中。

  基本方法:

  - cancel():取消future的執行,調度回調函數

  - result():返回future代表的結果

  - exception():返回future中的Exception

  - add_done_callback(fn):添加一個回調函數,當future執行的時候會調用這個回調函數

  - remove_done_callback(fn):從call when done列表中移除所有的callback的實例

  - set_result(result):將future標為執行完成,並且設置result的值

  - set_exception(exception):將future標為執行完成,並設置Exception

  測試用例:

# coding : utf-8
import asyncio
import sys

@asyncio.coroutine
def first_coroutine(future, n):
    # 計算前n個數的和
    count = 0
    for i in range(1, n+1):
        count = count + i
    print("first yield")
    yield from asyncio.sleep(2)
    print("first_coroutine finished")
    # 將future標記為已完成,並設置result的值
    future.set_result("first coroutine (sum of n integers) result = %s" %str(count))

@asyncio.coroutine
def second_coroutine(future, n):
    count = 1
    for i in range(2, n+1):
        count *= i
    print("second yield")
    yield from asyncio.sleep(1)
    print("second_coroutine finished")
    future.set_result("second coroutine (factorial) result = %s" %str(count))

def got_result(future):
    # 獲取future的set_result結果
    print(future.result())

if __name__ == "__main__":
    N1 = int(sys.argv[1])
    N2 = int(sys.argv[2])
    loop = asyncio.get_event_loop()
    future1 = asyncio.Future()
    future2 = asyncio.Future()
    tasks = [first_coroutine(future1, N1), second_coroutine(future2, N2)]

    # 添加回調函數
    future1.add_done_callback(got_result)
    future2.add_done_callback(got_result)
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

  運行結果:

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM