先描述一下場景:
我有一批任務需要放入線程池中去處理,但是一旦線程池中有1個任務出現了異常(拋了Exception)就將線程中尚未開始的任務全部取消不執行。
需要說明的是正在執行的任務因為無法撤銷,所以正在執行的任務只能繼續執行,等他執行完成。
import queue from concurrent.futures import ThreadPoolExecutor, as_completed, wait, ALL_COMPLETED, FIRST_EXCEPTION def send_cmd(ip, exec_queue): # 如果消息隊列中消息不為空,說明已經有任務異常了 if not exec_queue.empty(): return try: # 需要執行的主任務 except Exception as e: # 如果任務異常了就在隊列中寫入一個消息,用於鎖住線程池 exec_queue.put("Termination") # 此處一定要將異常再次拋出,否則主線程池無法捕獲異常,會統一認定為任務已被取消 raise Exception(e) # 此處使用消息隊列作為線程池鎖,避免在第一個任務異常發生后到主線程獲知中間仍然有任務被發送執行 exec_queue = queue.Queue() with ThreadPoolExecutor(max_workers=thread_pool_size) as executor: task_dict, task_list = {}, [] # 將任務全部放入線程池中 for ip in ip_list: task = executor.submit(send_cmd, ip, exec_queue) task_dict[task] = ip task_list.append(task) # 等待第一個任務拋出異常,就阻塞線程池 wait(task_list, return_when=FIRST_EXCEPTION) # 反向序列化之前塞入的任務隊列,並逐個取消 for task in reversed(task_list): task.cancel() # 等待正在執行任務執行完成 wait(task_list, return_when=ALL_COMPLETED) for task in task_list: if task_dict.get(task): if "finished returned NoneType" in str(task) or task.cancelled(): print("{}被取消".format(task_dict.get(task))) elif "finished raised Exception" in str(task): print("{}執行異常".format(task_dict.get(task))) else: print("{}執行成功".format(task_dict.get(task)))
加入隊列作為線程池鎖,是因為在實際測試中發現,如果所有任務被一次性塞入線程池后,當第一個異常發生到,異常被主線程池捕獲中間,仍然會有任務被執行,具體數量依據任務執行的快慢,數量也是不定的,但至少會有1個,屬於必現情況。加入線程池,可以有效阻止這類問題的發生。