concurrent.futures
concurrent.futures提供高層次的接口,用來實現異步調用。
這個異步執行可以使用threads(ThreadPoolExecutor)或者process(ProcessPoolExecutor)
這個feautre是Python3.2后的新功能,但是也支持Python2。
需要安裝futures模塊,https://pypi.python.org/pypi/futures/2.1.4
【例子1】非並發的例子
#!/usr/bin/env python2.6 from Queue import Queue import random import time q = Queue() fred = [1,2,3,4,5,6,7,8,9,10] def f(x): if random.randint(0,1): time.sleep(0.1) # res = x * x q.put(res) def main(): for num in fred: f(num) # while not q.empty(): print q.get() if __name__ == "__main__": main()
【例子2】使用ThreadPoolExecutor
#!/usr/bin/env python2.7 from Queue import Queue import concurrent.futures import random import time q = Queue() fred = [1,2,3,4,5,6,7,8,9,10] def f(x): if random.randint(0,1): time.sleep(0.1) # res = x * x q.put(res) def main(): with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: for num in fred: executor.submit(f, num) # while not q.empty(): print q.get() #################### if __name__ == "__main__": main()
使用線程池中4個workers處理所有job。
with的語句保證所有線程都執行完成后,再進行下面的操作。
結果保持在一個隊列中,隊列是線程安全的。
. “The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.“
隊列模塊實現多個生產者,多個消費者模式。特別在多線程之間進行信息交換的場景下最長使用。在這個模塊下Queue類實現了所有需要的鎖信息。
【例子3】使用ProcessPoolExecutor
“The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.“
ProcessPoolExecute是Executor的子類,使用進程池實現異步調用。ProcessPoolExecute使用多進程模塊,允許規避 Global Interpreter Lock,但是只有處理和返回picklable的對象。
#!/usr/bin/env python2.7 import sys import redis import concurrent.futures r = redis.Redis() fred = [1,2,3,4,5,6,7,8,9,10] def check_server(): try: r.info() except redis.exceptions.ConnectionError: print >>sys.stderr, "Error: cannot connect to redis server. Is the server running?" sys.exit(1) def f(x): res = x * x r.rpush("test", res) def main(): with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: for num in fred: executor.submit(f, num) # print r.lrange("test", 0, -1) #################### if __name__ == "__main__": check_server() ### r.delete("test") main()
使用到redis鏈表的數據結構
Queue is not a good choice here because we are using processes here, and Queue is made for threads.
Queue不是一個好的選擇,因為這里使用process。Queue是為線程准備的。
所以這里將結果存儲在redis的list中,redis: getting started
在redis中所有的操作都是原子的,因此對於不同的進程可以安全寫入相關的結果。
【測試】
1、把源數據設置為range(1,1000)之后,測試效果如下:
[root@typhoeus79 20140811]# time ./basic.py real 0m49.388s user 0m0.024s sys 0m0.013s [root@typhoeus79 20140811]# time ./thread.py real 0m12.687s user 0m0.103s sys 0m0.061s [root@typhoeus79 20140811]# time ./process.py real 0m0.507s user 0m0.557s sys 0m0.343s
【適應場景】
Threads are good for I/O tasks, while processes are good for CPU-bound tasks.
【Executor】
class concurrent.futures.Executor An abstract class that provides methods to execute calls asynchronously. It should not be used directly, but through its concrete subclasses
Executor是一個抽象的類,提供執行異步調用的方法。不能直接調用,而是通過具體的子類來調用。
ThreadPoolExecutor和ProcessPoolExecutor都是其的子類。
submit(fn, *args, **kwargs) Schedules the callable, fn, to be executed as fn(*args **kwargs) and returns a Future object representing the execution of the callable.
執行函數fn(*args,**kwargs),返回一個Future對象,代表可調用的執行。
>>> with ThreadPoolExecutor(max_workers=1) as executor: ... future = executor.submit(pow, 323, 1235) ... print(future) ... <Future at 0x7f1e7d053e10 state=finished returned long>
#打印結果 >>> with ThreadPoolExecutor(max_workers=1) as executor: ... future = executor.submit(pow, 323, 1235) ... print(future.result())
map(func, *iterables, timeout=None) Equivalent to map(func, *iterables) except func is executed asynchronously and several calls to func may be made concurrently. The returned iterator raises a TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to Executor.map(). timeout can be an int or a float. If timeout is not specified or None, there is no limit to the wait time. If a call raises an exception, then that exception will be raised when its value is retrieved from the iterator.
並發執行func,參數為iterables指定。timeout可以指定為int或者float類型,如果沒有指定或者None,則無限等待。如果觸發異常,當從iterator獲取值的時候,這個異常將被捕獲。
shutdown(wait=True) Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to Executor.submit() and Executor.map() made after shutdown will raise RuntimeError.
釋放資源使用。
使用with語句,避免該函數的調用,with語句會關閉所有的Executor。
>>> with ThreadPoolExecutor(max_workers=4) as e: ... e.submit(shutil.copy, 'src1.txt', 'dest1.txt') ... e.submit(shutil.copy, 'src2.txt', 'dest2.txt') ... e.submit(shutil.copy, 'src3.txt', 'dest3.txt') ... e.submit(shutil.copy, 'src3.txt', 'dest4.txt') ... <Future at 0x7f1e79191250 state=running> <Future at 0x7f1e79191450 state=finished raised IOError> <Future at 0x7f1e79191250 state=running> <Future at 0x7f1e79191450 state=finished raised IOError>
【參考文獻】
1、https://pythonadventures.wordpress.com/tag/threadpoolexecutor/
2、https://docs.python.org/dev/library/concurrent.futures.html#module-concurrent.futures