https://docs.python.org/3/library/concurrent.futures.html
17.4.1 Executor Objects
class concurrent.futures.Executor # concurrent.futures.Executor類
An abstract class that provides methods to execute calls asynchronously. It should not be used directly, but through its concrete subclasses.
提供異步執行調用的方法的抽象類。它不應該直接使用,而是通過具體的子類來使用。
類方法:
submit(fn, *args, **kwargs) 提交(函數,*參數,**參數)
Schedules the callable, fn, to be executed as fn(*args **kwargs) and returns a Future object representing the execution of the callable.
計划要執行調用,fn,fn(*參數和* *參數)和返回表示可調用執行一個未來的目標。
from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
map(func, *iterables, timeout=None, chunksize=1)
Similar to map(func, *iterables) except:
the iterables are collected immediately rather than lazily;
func is executed asynchronously and several calls to func may be made concurrently.
類似於map(函數,*可迭代對象)除以下方面:迭代對象是立即執行而不是懶洋洋地;函數是異步執行的,對幾個函數的調用可以同時進行。
The returned iterator raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to Executor.map(). timeout can be an int or a float. If timeout is not specified or None, there is no limit to the wait time.
返回的迭代器提出concurrent.futures.timeouterror,如果調用__next__(),調用Executor.map()超時后結果不可用。超時可以是整數或浮點數。如果沒有指定超時或沒有超時,則等待時間沒有限制。
If a func call raises an exception, then that exception will be raised when its value is retrieved from the iterator.
如果函數調用引發異常,則該異常在從迭代器中檢索其值時將引發異常。
When using ProcessPoolExecutor, this method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor, chunksize has no effect.
使用processpoolexecutor,這種方法切分可迭代對象成若干塊,它向池提交作為單獨的任務。這些塊的(近似)大小可以通過設置一個正整數,指定分片。很長的可迭代對象,采用大值分片能明顯比1的默認大小提高性能。用線程池,分片大小沒有影響。
shutdown(wait=True) 關閉(等待= TRUE)
Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to Executor.submit() and Executor.map() made after shutdown will raise RuntimeError.
向執行器發出信號,當前正在執行future時,它應該釋放它正在使用的任何資源。shutdown()后調用執行submit()和map()后,會報運行時出錯。
17.4.2 ThreadPoolExecutor
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
線程池執行器是一個執行器子類,使用線程池的線程執行異步調用。
Deadlocks can occur when the callable associated with a Future waits on the results of another Future.
當與未來相關聯的可調用等待另一個未來的結果時,可能發生死鎖。
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
子類執行器使用一個池在最大(max_workers線程)去執行異步調用。
17.4.3. ProcessPoolExecutor
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
進程池執行器類是一個執行器子類,使用一個進程池來執行異步調用。進程池執行器采用多進程模塊,這使得它繞過全局解釋器鎖,也意味着只有picklable對象可以執行並返回。
The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.
__main__模塊必須的在工作的子進程模塊中。這意味着,進程池執行器不會在交互式解釋器的工作。
Calling Executor or Future methods from a callable submitted to a ProcessPoolExecutor will result in deadlock.
從遞交的可調用進程池執行器中調用執行器或者future方法會造成死鎖。
class concurrent.futures.ProcessPoolExecutor(max_workers=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
一個執行子類執行異步調用,使用池的最大(=max_workers)個進程。如果max_workers沒有或不給,則默認為機器上的處理器的數量。如果max_workers低於或等於0,則將引發ValueError
17.4.4. Future Objects
The Future class encapsulates the asynchronous execution of a callable. Future instances are created by Executor.submit().
未來類封裝了可調用的異步執行。Future實例是由Executor.submit()創建的。
class concurrent.futures.Future #concurrent.futures.Future 類
Encapsulates the asynchronous execution of a callable. Future instances are created by Executor.submit() and should not be created directly except for testing.
封裝可調用的異步執行。Future實例是由Executor.submit()創建的。submit()不能直接創建除了測試外。
cancel()
Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.
cancelled()
Return True if the call was successfully cancelled.
running()
Return True if the call is currently being executed and cannot be cancelled.
done()
Return True if the call was successfully cancelled or finished running.
result(timeout=None)
Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a concurrent.futures.TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
If the future is cancelled before completing then CancelledError will be raised.
If the call raised, this method will raise the same exception.
問題1:
1、submit遞交后,可以用result來查看結果,如果用map遞交后,該如何查看結果呢?map遞交后為generator對象,通過list或者tuple可以查看結果。
from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 3, 2) print(future.result()) from concurrent.futures import ProcessPoolExecutor if __name__=='__main__': #如果不用,則會報錯 with ProcessPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 3, 2) print(future.result()) # 9 # 9 # 9 多出來一個9,因為 if __name__=='__main__':
from concurrent.futures import ProcessPoolExecutor if __name__=='__main__': #如果不用,則會報錯 with ProcessPoolExecutor(max_workers=1) as executor: future = executor.map(pow, [1,2],[3,4]) print(type(future),list(future)) #<class 'generator'> [1, 16]
執行流程:
ProcessPoolExecutor類會利用multiprocessing模塊所提供的底層機制,來逐步完成下列操作:
1、把 [1,2],[3,4]兩個列表中的每一項輸入數據傳給map
2、用pickle模塊對數據進行序列化,將其變為二進制形式。
3、通過本地套接字(local socket),將序列化之后的數據從主解釋器所在的進程,發送到子解釋器所在的進程。
4、接下來,在子進程中,用pickle對二進制數據進行反序列化操作,將其還原為python對象。
5、引入包含pow函數的那個python模塊。
6、各條子進程平行地針對各自的輸入數據,來運行pow函數。
7、對運行結果進行序列化操作,將其轉變為字節。
8、將這些字節通過socket復制到主進程之中。
9、主進程對這些字節執行反序列操作,將其還原為python對象。
10、最后,把每條子進程所求出的計算結果合並到一份列表之中,並返回給調用者。
問題2:concurrent.futures是否可以提高執行速度?
以下代碼執行結果可見,通過futures模塊,提高速度近一倍。
from concurrent import futures import time def gcd(pair): a,b=pair low=min(a,b) for i in range(low,0,-1): if a%i==0 and b%i==0: return i numbers=[(19622022,22737382),(2332312,932326),(19649022,22736382),(2764312,9329765)] # start=time.time() # results=list(map(gcd,numbers)) # print(results) # end=time.time() # print('took {:.3f} seconds'.format(end-start)) # [2, 2, 6, 1] # took 3.197 seconds if __name__=='__main__': start=time.time() pool=futures.ProcessPoolExecutor() results=list(pool.map(gcd,numbers)) print(results) end=time.time() print('took {:.3f} seconds'.format(end-start)) # [2, 2, 6, 1] # took 1.683 seconds