https://www.cnblogs.com/pdev/p/10685093.html
1. 以下為第一種,函數級的異步執行:
import time from concurrent.futures import ThreadPoolExecutor def task1_fn(url): time.sleep(10) return (url + " FINISHED") def task2_fn(url): time.sleep(3) return (url + " FINISHED") def RunPool(): pool = ThreadPoolExecutor(1) # 啟動一個線程池 task1 = pool.submit(task1_fn, "CPU1") # 在另外的線程中運行RunBenchmark() task2 = pool.submit(task2_fn, "CPU2") # 在另外的線程中運行RunBenchmark() cnt = 0 while True: if cnt == 10: task1.cancel() print("task1:%s" % task1.done()) print("task2:%s" % task2.done()) if task1.done(): print('task1 is over') if task1.done() and task2.done(): print('all is over.') break time.sleep(1) cnt += 2 print('task1 result:%s' % task1.result()) print('task2 result:%s' % task2.result()) print("bye") if __name__ == '__main__': RunPool()
2. 類級函數的的異步執行,添加了線程強制中斷 pool.shutdown
import time from concurrent.futures import ThreadPoolExecutor class A(): def __init__(self): pass def test_ret(self): return 'hello world' def task1_fn(self, url): time.sleep(10) return (url + " FINISHED" + self.test_ret()) def task2_fn(self, url): time.sleep(3) return (url + " FINISHED" + self.test_ret()) def RunPool(): a = A() pool = ThreadPoolExecutor(2) # 啟動一個線程池 task1 = pool.submit(a.task1_fn, "CPU1") # 在另外的線程中運行RunBenchmark() task2 = pool.submit(a.task2_fn, "CPU2") # 在另外的線程中運行RunBenchmark() cnt = 0 while True: if cnt == 10: task1.cancel() print("task1:%s" % task1.done()) print("task2:%s" % task2.done()) if task1.done(): print('task1 is over') else: task1.cancel() pool.shutdown(wait=False) print('task1 result AAAAAAAA %s' % task1.result()) break # if task1.done() and task2.done(): # print('all is over.') # break time.sleep(1) cnt += 2 print('task1 result:%s' % task1.result()) print('task2 result:%s' % task2.result()) print("bye") if __name__ == '__main__': RunPool()
3. 第一個任務一旦完成,則強制終止線程
應用場景:如果某個任務一直處於執行中,無法退出,此時就需要強制退出,而強制退出一般需要重新線程run方法,但 模塊 from concurrent.futures import ThreadPoolExecutor 重寫run方法較麻煩,故采用另一種方式
wait(f_lst, return_when='FIRST_COMPLETED')
f_lst中存放的是提交到線程池中的線程對象列表
import time import inspect import ctypes from concurrent.futures import ThreadPoolExecutor, wait class A(): def __init__(self): pass def test_ret(self): return 'hello world' def task1_fn(self, url): time.sleep(200) return (url + " FINISHED " + self.test_ret() + ' ' + str(var)) def task2_fn(self, url): time.sleep(5) return (url + " FINISHED " + self.test_ret()) def RunPool(): a = A() pool = ThreadPoolExecutor(2) # 啟動一個線程池 task1 = pool.submit(a.task1_fn, "CPU1") # 在另外的線程中運行RunBenchmark() task2 = pool.submit(a.task2_fn, "CPU2") # 在另外的線程中運行RunBenchmark() f_lst = [task1, task2] wait(f_lst, return_when='FIRST_COMPLETED') try: print('task status, task1: %s, task2: %s' % (task1.done(), task2.done())) # print('task1 result:%s' % task1.result()) print('task2 result:%s' % task2.result()) except Exception as e: print('error: %s' % e) print("bye") if __name__ == '__main__': var = 123 RunPool()
注意:只有先完成的任務才可以使用 task.result() 方法,否則會一直卡着,等待 結果返回,而此時線程已被殺死,無法返回結果
4. 關鍵點說明:
1) task.cancel()當線程未運行之前可以取消,但是線程在線程池中啟動后,無法再通過此方式取消
2) pool.shutdown()函數入參 wait=True表示中斷線程前,等待直到線程執行完成才會中斷, wait=False 表示 線程未執行完成,強制中斷
3)傳參方式和位置參數一樣,不用使用元組
5. 適用業務:
多進程下的多線程,某個任務可能需要等待很長時間,此時就需要中斷此任務,或者去執行其他的任務,如果要求線性處理,中斷超時任務,就需要用此方法
針對上述情況下的單個進程,可以使用異步進程池 multiprocessing的apply_async方式,主進程和子進程同時執行,互不影響。
https://www.cnblogs.com/pdev/p/10685093.html
**1. 以下為第一種,函數級的異步執行:**`import timefrom concurrent.futures import ThreadPoolExecutor
import timefrom concurrent.futures import ThreadPoolExecutor def task1_fn(url): time.sleep(10) return (url + " FINISHED") def task2_fn(url): time.sleep(3) return (url + " FINISHED") def RunPool(): pool = ThreadPoolExecutor(1) # 啟動一個線程池 task1 = pool.submit(task1_fn, "CPU1") # 在另外的線程中運行RunBenchmark() task2 = pool.submit(task2_fn, "CPU2") # 在另外的線程中運行RunBenchmark() cnt = 0 while True: if cnt == 10: task1.cancel() print("task1:%s" % task1.done()) print("task2:%s" % task2.done()) if task1.done(): print('task1 is over') if task1.done() and task2.done(): print('all is over.') break time.sleep(1) cnt += 2 print('task1 result:%s' % task1.result()) print('task2 result:%s' % task2.result()) print("bye") if __name__ == '__main__': RunPool()
**2. 類級函數的的異步執行,添加了線程強制中斷 pool.shutdown**
import timefrom concurrent.futures import ThreadPoolExecutor class A(): def __init__(self): pass def test_ret(self): return 'hello world' def task1_fn(self, url): time.sleep(10) return (url + " FINISHED" + self.test_ret()) def task2_fn(self, url): time.sleep(3) return (url + " FINISHED" + self.test_ret()) def RunPool(): a = A() pool = ThreadPoolExecutor(2) # 啟動一個線程池 task1 = pool.submit(a.task1_fn, "CPU1") # 在另外的線程中運行RunBenchmark() task2 = pool.submit(a.task2_fn, "CPU2") # 在另外的線程中運行RunBenchmark() cnt = 0 while True: if cnt == 10: task1.cancel() print("task1:%s" % task1.done()) print("task2:%s" % task2.done()) if task1.done(): print('task1 is over') else: task1.cancel() pool.shutdown(wait=False) print('task1 result AAAAAAAA %s' % task1.result()) break # if task1.done() and task2.done(): # print('all is over.') # break time.sleep(1) cnt += 2 print('task1 result:%s' % task1.result()) print('task2 result:%s' % task2.result()) print("bye") if __name__ == '__main__': RunPool()
**> 關鍵點說明:**1. task.cancel()當線程未運行之前可以取消,但是線程在線程池中啟動后,無法再通過此方式取消2. pool.shutdown()函數入參 wait=True表示中斷線程前,等待直到線程執行完成才會中斷, wait=False 表示 線程未執行完成,強制中斷
**> 適用業務:** 多進程下的多線程,某個任務可能需要等待很長時間,此時就需要中斷此任務,或者去執行其他的任務,如果要求線性處理,中斷超時任務,就需要用此方法 針對上述情況下的單個進程,可以使用異步進程池 multiprocessing的apply_async方式,主進程和子進程同時執行,互不影響。
總結:
#shutdown(wait=True) 相當於進程池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源后才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait參數為何值,整個程序都會等到所有任務執行完畢 submit和map必須在shutdown之前