python之concurrent.futures模塊


一、concurrent.futures模塊簡介

concurrent.futures 模塊提供了並發執行調用的高級接口

並發可以使用threads執行,使用ThreadPoolExecutor 或 分離的processes,使用ProcessPoolExecutor。都實現了同一個接口,這個接口在抽象類Executor定義

二、類的屬性和方法

concurrent.futures.wait(fstimeout=Nonereturn_when=ALL_COMPLETED):wait等待fs里面所有的Future實例(由不同的Executors實例創建的)完成。返回兩個命名元祖,第一個元祖名為done,存放完成的futures對象,第二個元祖名為not_done,存放未完成的futures。return_when參數必須是concurrent.futures里面定義的常量:FIRST_COMPLETED,FIRST_EXCEPTION,ALL_COMPLETED

concurrent.futures.as_completed(fstimeout=None):返回一個迭代器,yield那些完成的futures對象。fs里面有重復的也只可能返回一次。任何futures在調用as_completed()調用之前完成首先被yield。

三、Future對象

 Future()封裝了可調用對象的異步執行。Future實例可以被Executor.submit()方法創建。除了測試之外不應該直接創建。Future對象可以和異步執行的任務進行交互

cancel():嘗試去取消調用。如果調用當前正在執行,不能被取消。這個方法將返回False,否則調用將會被取消,方法將返回True

cancelled():如果調用被成功取消返回True

running():如果當前正在被執行不能被取消返回True

done():如果調用被成功取消或者完成running返回True

result(Timeout = None):拿到調用返回的結果。如果沒有執行完畢就會去等待

exception(timeout=None):捕獲程序執行過程中的異常

add_done_callback(fn):將fn綁定到future對象上。當future對象被取消或完成運行時,fn函數將會被調用

以下的方法是在unitest中

set_running_or_notify_cancel()

set_result(result)

set_exception(exception) 
Future方法

四、Executor對象

1、抽象類,提供異步調用的方法。不能被直接使用,而是通過構建子類。

2、方法

提交任務方式一:submit(fn*args**kwargs):調度函數fn(*args **kwargs)返回一個Future對象代表調用的執行。

提交任務方式二:map(func*iterablestimeout=Nonechunksize=1):和map(func, *iterables)相似。但是該map方法的執行是異步的。多個func的調用可以同時執行。當Executor對象是 ProcessPoolExecutor,才可以使用chunksize,將iterable對象切成塊,將其作為分開的任務提交給pool,默認為1。對於很大的iterables,設置較大chunksize可以提高性能(切記)。

shutdown(wait=True):給executor發信號,使其釋放資源,當futures完成執行時。已經shutdown再調用submit()或map()會拋出RuntimeError。使用with語句,就可以避免必須調用本函數

五、ThreadPoolExecutor對象

ThreadPoolExecutor是Executor的子類使用線程池來異步執行調用

如果使用不正確可能會造成死鎖,所以submit的task盡量不要調用executor和futures,否則很容易出現死鎖

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
相互等待的死鎖
def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
等待自己的結果的死鎖

默認的max_workers是設備的處理器數目*5

六、ProcessPoolExecutor對象

 ProcessPoolExecutor同樣是Executor的子類。使用進程池來異步執行調用。

Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue

Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
  WorkItem from the "Work Items" dict: if the work item has been cancelled then
  it is simply removed from the dict, otherwise it is repackaged as a
  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
  "Work Items" dict and deletes the dict entry

Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
  _ResultItems in "Result Q"
數據流程解釋

 

ProcessPoolExecutor使用multiprocessing模塊,不受GIL鎖的約束,意味着只有可以pickle的對象才可以執行和返回(pickle參考)

__main__必須能夠被工作子進程導入。所以意味着ProcessPoolExecutor在交互式解釋器下不能工作。

提交給ProcessPoolExecutor的可調用方法里面調用Executor或Future將會形成死鎖。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

max_workers默認是處理器的個數

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    115797848077098,
    1099726899285419]


def is_prime(n):
    """
    to judge the input number is prime or not
    :param n: input number
    :return: True or False
    """
    if n % 2 == 0:
        return False

    sqrt_n = int(math.(math.sqrt(n)))
    for i in range(3,sqrt_n + 1, 2):
        if n % i == 0:
            return False
        return True


def main():
    """
    create Process Pool to judge the numbers is prime or not
    :return: None
    """
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print(number,prime)

if __name__ == '__main__':
    main()
樣例

七、Exception類

exception concurrent.futures.CancelledError

exception concurrent.futures.TimeoutError

exception concurrent.futures.process.BrokenProcessPool

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM