python線程池ThreadPoolExecutor與進程池ProcessPoolExecutor


 python中ThreadPoolExecutor(線程池)與ProcessPoolExecutor(進程池)都是concurrent.futures模塊下的,主線程(或進程)中可以獲取某一個線程(進程)執行的狀態或者某一個任務執行的狀態及返回值。

通過submit返回的是一個future對象,它是一個未來可期的對象,通過它可以獲悉線程的狀態

 

ThreadPoolExecutor(線程池)

 

通過submit函數提交執行的函數到線程池中,done()判斷線程執行的狀態:
 1 import time  2 from concurrent.futures import ThreadPoolExecutor  3 
 4 def get_thread_time(times):  5  time.sleep(times)  6     return times  7 
 8 # 創建線程池 指定最大容納數量為4
 9 executor = ThreadPoolExecutor(max_workers=4) 10 # 通過submit提交執行的函數到線程池中
11 task1 = executor.submit(get_thread_time, (1)) 12 task2 = executor.submit(get_thread_time, (2)) 13 task3 = executor.submit(get_thread_time, (3)) 14 task4 = executor.submit(get_thread_time, (4)) 15 print("task1:{} ".format(task1.done())) 16 print("task2:{}".format(task2.done())) 17 print("task3:{} ".format(task3.done())) 18 print("task4:{}".format(task4.done())) 19 time.sleep(2.5) 20 print('after 2.5s {}'.format('-'*20)) 21 
22 done_map = { 23     "task1":task1.done(), 24     "task2":task2.done(), 25     "task3":task3.done(), 26     "task4":task4.done() 27 } 28 # 2.5秒之后,線程的執行狀態
29 for task_name,done in done_map.items(): 30     if done: 31         print("{}:completed".format(task_name))

result:

task1:False task2:False task3:False task4:False after 2.5s -------------------- task1:completed task2:completed

初始狀態4個task都是未完成狀態,2.5秒后task1和task2執行完成,task3和task由於是sleep(3) sleep(4)所以仍然是未完成的sleep狀態

通過wait()判斷線程執行的狀態:

wait(fs, timeout=None, return_when=ALL_COMPLETED),wait接受3個參數,fs表示執行的task序列;timeout表示等待的最長時間,超過這個時間即使線程未執行完成也將返回;return_when表示wait返回結果的條件,默認為ALL_COMPLETED全部執行完成再返回:

 1 import time  2 from concurrent.futures import (  3  ThreadPoolExecutor, wait  4 )  5 
 6 
 7 def get_thread_time(times):  8  time.sleep(times)  9     return times 10 
11 
12 start = time.time() 13 executor = ThreadPoolExecutor(max_workers=4) 14 task_list = [executor.submit(get_thread_time, times) for times in [1, 2, 3, 4]] 15 i = 1
16 for task in task_list: 17     print("task{}:{}".format(i, task)) 18     i += 1
19 print(wait(task_list, timeout=2.5))

wait在2.5秒后返回線程的狀態,result:

task1:<Future at 0x7ff3c885f208 state=running> task2:<Future at 0x7ff3c885fb00 state=running> task3:<Future at 0x7ff3c764b2b0 state=running> task4:<Future at 0x7ff3c764b9b0 state=running> DoneAndNotDoneFutures(
done
={<Future at 0x7ff3c885f208 state=finished returned int>, <Future at 0x7ff3c885fb00 state=finished returned int>},

not_done={<Future at 0x7ff3c764b2b0 state=running>, <Future at 0x7ff3c764b9b0 state=running>})

可以看到在timeout 2.5時,task1和task2執行完畢,task3和task4仍在執行中

通過map返回線程的執行結果:

 1 import time  2 from concurrent.futures import ThreadPoolExecutor  3 
 4 
 5 def get_thread_time(times):  6  time.sleep(times)  7     return times  8 
 9 
10 start = time.time() 11 executor = ThreadPoolExecutor(max_workers=4) 12 
13 i = 1
14 for result in executor.map(get_thread_time,[2,3,1,4]): 15     print("task{}:{}".format(i, result)) 16     i += 1

 

map(fn, *iterables, timeout=None),第一個參數fn是線程執行的函數;第二個參數接受一個可迭代對象;第三個參數timeout跟wait()的timeout一樣,但由於map是返回線程執行的結果,如果timeout小於線程執行時間會拋異常TimeoutError。

import time from concurrent.futures import ThreadPoolExecutor def get_thread_time(times): time.sleep(times) return times start = time.time() executor = ThreadPoolExecutor(max_workers=4) i = 1
for result in executor.map(get_thread_time,[2,3,1,4]): print("task{}:{}".format(i, result)) i += 1

map的返回是有序的,它會根據第二個參數的順序返回執行的結果:

task1:2 task2:3 task3:1 task4:4

 

as_completed返回線程執行結果:
 1 import time  2 from collections import OrderedDict  3 from concurrent.futures import (  4  ThreadPoolExecutor, as_completed  5 )  6 
 7 
 8 def get_thread_time(times):  9  time.sleep(times) 10     return times 11 
12 
13 start = time.time() 14 executor = ThreadPoolExecutor(max_workers=4) 15 task_list = [executor.submit(get_thread_time, times) for times in [2, 3, 1, 4]] 16 task_to_time = OrderedDict(zip(["task1", "task2", "task3", "task4"],[2, 3, 1, 4])) 17 task_map = OrderedDict(zip(task_list, ["task1", "task2", "task3", "task4"])) 18 
19 for result in as_completed(task_list): 20     task_name = task_map.get(result) 21     print("{}:{}".format(task_name,task_to_time.get(task_name)))
 
        

task1、task2、task3、task4的等待時間分別為2s、3s、1s、4s,通過as_completed返回執行完的線程結果,as_completed(fs, timeout=None)接受2個參數,第一個是執行的線程列表,第二個參數timeout與map的timeout一樣,當timeout小於線程執行時間會拋異常TimeoutError。

task3:1 task1:2 task2:3 task4:4

通過執行結果可以看出,as_completed返回的順序是線程執行結束的順序,最先執行結束的線程最早返回。

 

ProcessPoolExecutor

對於頻繁的cpu操作,由於GIL鎖的原因,多個線程只能用一個cpu,這時多進程的執行效率要比多線程高。

線程池操作斐波拉切:

 1 import time  2 from concurrent.futures import ThreadPoolExecutor  3 
 4 
 5 def fib(n):  6     if n < 3:  7         return 1
 8     return fib(n - 1) + fib(n - 2)  9 
10 
11 start_time = time.time() 12 executor = ThreadPoolExecutor(max_workers=4) 13 task_list = [executor.submit(fib, n) for n in range(3, 35)] 14 thread_results = [task.result() for task in as_completed(task_list)] 15 print(thread_results) 16 print("ThreadPoolExecutor time is: {}".format(time.time() - start_time))

result:

[8, 5, 3, 2, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 10946, 46368, 6765, 28657, 17711, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887] ThreadPoolExecutor time is: 4.998981237411499

 

進程池操作斐波拉切:

 1 import time  2 from concurrent.futures import ProcessPoolExecutor  3 
 4 
 5 def fib(n):  6     if n < 3:  7         return 1
 8     return fib(n - 1) + fib(n - 2)  9 
10 
11 start_time = time.time() 12 executor = ProcessPoolExecutor(max_workers=4) 13 task_list = [executor.submit(fib, n) for n in range(3, 35)] 14 process_results = [task.result() for task in as_completed(task_list)] 15 print(process_results) 16 print("ProcessPoolExecutor time is: {}".format(time.time() - start_time))

result:

[2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 75025, 28657, 46368, 196418, 121393, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887] ProcessPoolExecutor time is: 3.3585257530212402

可以看出,對於頻繁cpu操作進程是優於線程的,3.3s<4.9s

ProcessPoolExecutor在使用上和ThreadPoolExecutor大致是一樣的,它們在futures中的方法也是相同的,但是對於map()方法ProcessPoolExecutor會多一個參數chunksize(ThreadPoolExecutor中這個參數沒有任何作用),chunksize將迭代對象切成塊,將其作為分開的任務提交給pool,對於很大的iterables,設置較大chunksize可以提高性能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM