0.concurrent.futures庫
參考:https://docs.python.org/3/library/concurrent.futures.html
之前我們使用多線程(threading)和多進程(multiprocessing)完成常規的需求: 在啟動的時候start、jon等步驟不能省,復雜的需要還要用1-2個隊列。 隨着需求越來越復雜,如果沒有良好的設計和抽象這部分的功能層次,代碼量越多調試的難度就越大。 有沒有什么好的方法把這些步驟抽象一下呢,讓我們不關注這些細節,輕裝上陣呢? 答案是:有的, 從Python3.2開始一個叫做concurrent.futures被納入了標准庫; 而在Python2它屬於第三方的futures庫,需要手動安裝: pip install futures The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be be performed by threads using ThreadPoolExecutor or seperate processes using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.
1.進程池
- 串行執行的情況:
import math,time PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): for num in PRIMES: print('%d is prime: %s' % (num, is_prime(num))) if __name__ == '__main__': start_time = time.time() main() end_time = time.time() print('Run time is %s' % (end_time-start_time)) ---結果--- 112272535095293 is prime: True 112582705942171 is prime: True 112272535095293 is prime: True 115280095190773 is prime: True 115797848077099 is prime: True 1099726899285419 is prime: False Run time is 3.9570000171661377
- 使用multiprocessing.Pool的情況:
import math,time from multiprocessing import Pool PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): pool = Pool() res_l = [] for prime in PRIMES: res = pool.apply_async(func=is_prime,args=(prime,)) res_l.append(res) pool.close() pool.join() for number, prime in zip(PRIMES, res_l): print('%d is prime: %s' % (number, prime.get())) if __name__ == '__main__': start_time = time.time() main() end_time = time.time() print('Run time is %s' % (end_time-start_time)) ---結果--- 112272535095293 is prime: True 112582705942171 is prime: True 112272535095293 is prime: True 115280095190773 is prime: True 115797848077099 is prime: True 1099726899285419 is prime: False Run time is 2.687000036239624
- 使用進程池 concurrent.futures.ProcessPoolExecutor的情況:
- 參考:http://pythonhosted.org/futures/#concurrent.futures.ProcessPoolExecutor
ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. class concurrent.futures.ProcessPoolExecutor(max_workers=None) Executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given then as many worker processes will be created as the machine has processors.
- ProcessPoolExecutor 本質上也是調用multiprocessing模塊
import math,time from concurrent import futures PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): with futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime)) if __name__ == '__main__': start_time = time.time() main() end_time = time.time() print('Run time is %s' % (end_time-start_time)) ---結果--- 112272535095293 is prime: True 112582705942171 is prime: True 112272535095293 is prime: True 115280095190773 is prime: True 115797848077099 is prime: True 1099726899285419 is prime: False Run time is 2.482999801635742
2.線程池
- 參考:http://pythonhosted.org/futures/#threadpoolexecutor-objects
The ThreadPoolExecutor class is an Executor subclass that uses a pool of threads to execute calls asynchronously. class concurrent.futures.ThreadPoolExecutor(max_workers) Executes calls asynchronously using at pool of at most max_workers threads.
- 串行執行的情況:
import urllib.request import time URLS = [ 'http://www.foxnews.com/', 'https://www.stanford.edu/', 'http://www.mit.edu/', 'https://www.python.org/', 'https://www.yahoo.com/', 'http://www.ox.ac.uk/' ] def load_url(url, timeout): return urllib.request.urlopen(url, timeout=timeout).read() start_time = time.time() for url in URLS: print('%r page is %d bytes' % (url, len(load_url(url,60)))) end_time = time.time() print("Run time is %s" % (end_time-start_time)) ---結果--- 'http://www.foxnews.com/' page is 71131 bytes 'https://www.stanford.edu/' page is 68595 bytes 'http://www.mit.edu/' page is 21405 bytes 'https://www.python.org/' page is 47701 bytes 'https://www.yahoo.com/' page is 434510 bytes 'http://www.ox.ac.uk/' page is 93411 bytes Run time is 5.068000078201294
- 使用多線程的情況:
import urllib.request import time from threading import Thread URLS = [ 'http://www.foxnews.com/', 'https://www.stanford.edu/', 'http://www.mit.edu/', 'https://www.python.org/', 'https://www.yahoo.com/', 'http://www.ox.ac.uk/' ] def load_url(url, timeout): res = urllib.request.urlopen(url, timeout=timeout).read() print('%r page is %d bytes' % (url, len(res))) t_l = [] start_time = time.time() for url in URLS: t = Thread(target=load_url,args=(url,60,)) t_l.append(t) t.start() for t in t_l: t.join() end_time = time.time() print("Run time is %s" % (end_time-start_time)) ---結果--- 'http://www.mit.edu/' page is 21403 bytes 'http://www.foxnews.com/' page is 71735 bytes 'https://www.python.org/' page is 47701 bytes 'https://www.stanford.edu/' page is 69130 bytes 'http://www.ox.ac.uk/' page is 93411 bytes 'https://www.yahoo.com/' page is 446715 bytes Run time is 2.6540000438690186
- 使用線程池 concurrent.futures.ThreadPoolExecutor的情況:
from concurrent import futures import urllib.request import time URLS = [ 'http://www.foxnews.com/', 'https://www.stanford.edu/', 'http://www.mit.edu/', 'https://www.python.org/', 'https://www.yahoo.com/', 'http://www.ox.ac.uk/' ] def load_url(url, timeout): return urllib.request.urlopen(url, timeout=timeout).read() start_time = time.time() with futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = dict((executor.submit(load_url, url, 60), url) for url in URLS) for future in futures.as_completed(future_to_url): url = future_to_url[future] if future.exception() is not None: print('%r generated an exception: %s' % (url,future.exception())) else: print('%r page is %d bytes' % (url, len(future.result()))) end_time = time.time() print("Run time is %s" % (end_time-start_time)) ---結果--- 'http://www.mit.edu/' page is 21405 bytes 'http://www.foxnews.com/' page is 71197 bytes 'https://www.python.org/' page is 47701 bytes 'http://www.ox.ac.uk/' page is 93411 bytes 'https://www.yahoo.com/' page is 444854 bytes 'https://www.stanford.edu/' page is 68595 bytes Run time is 2.497999906539917
備注:由於網絡不穩定因素,所以Run time僅作為參考值;