自己在項目的開發中,一般能避免在單個進程中使用多線程就盡量把每個線程包裝成獨立的進程執行,通過socket或者一些中間件比如redis進行通訊,工作,協調。
但有時候必須涉及到多線程操作,而且碰到的情況中,多個線程必須協調全部正常工作才能執行邏輯,但子線程有着自己的棧區,報錯了並不影響其它的線程,導致整個進程無法退出。
我當時想到的有兩種思路,一種是多個線程間進行通訊或者一個全局變量的標記,當報錯的時候,就修改這個標記,所有的子線程定時去查詢這個標記,但感覺這個思路的拓展性太差,而且每個子線程需要主動定期查詢或者通訊,太麻煩了。
后面一種就是我准備上代碼的思路, 將所有的子線程設計成守護線程,主線程循環查詢子線程的狀態值,當發現任意的子線程狀態異常,獲取該子線程的異常對象,並上浮,退出主線程,導致所有的子線程退出。
import threading, traceback import time class ExcThread(threading.Thread): def __init__(self, call_obj, *args, **kwargs): super(ExcThread, self).__init__(*args, **kwargs) self.callable_obj = call_obj # 自己設置的退出狀態值 self.exit_code = 0 self.exception = None self.exc_traceback = '' # 主動設置為守護線程,必須條件 self.setDaemon(True) def run(self): try: self._run() except Exception as e: self.exit_code = 1 # 存儲異常對象保存在實例對象中 self.exception = e self.exc_traceback = traceback.format_exc() def _run(self): try: self.callable_obj(*self._args, **self._kwargs) except Exception as e: raise e def t_func(name, age=18): while 1: print(name, age) time.sleep(3) if age == 1: raise Exception('hee') # 生成一份子線程列表對象,用於主線程輪詢檢查使用 def start_child_thread(): thread_task_list = [] for i in range(3): f = ExcThread(call_obj=t_func, args=('sidian',), kwargs={'age': i}) f.start() thread_task_list.append(f) return thread_task_list def check_thread(): t_list = start_child_thread() while 1: for task in t_list: if not task.is_alive(): raise task.exception time.sleep(1) if __name__ == '__main__': check_thread()
2022年2月22日更新。
由於項目的改版,重新需要用到多線程,個人定制版的Thread
class ExcThread(threading.Thread): def __init__(self, target=None, *args, name=None, **kwargs): if name is None: name = target.__name__ super(ExcThread, self).__init__(None, target, name, args, kwargs) self.exit_code = None self.exception = None self.exc_traceback = '' # 默認為守護線程 self.setDaemon(True) def run(self): try: self._run() except Exception as e: self.exception = e self.exit_code = 1 self.exc_traceback = traceback.format_exc() else: self.exit_code = 0 def _run(self): try: self._target(*self._args, **self._kwargs) except Exception as e: raise e
主線程巡查各子線狀態
# 主線程檢測子線程運行,接受到子線程死亡信號,上浮子線程錯誤信息 def _check_child_thread_status(self): while True: for task in self._thread_task_list.copy(): # 已經完成的任務刪除 if not task.is_alive(): self._thread_task_list.remove(task) if task.exit_code: print(f'{datetime.datetime.now()} task: {task.name} raise exception.', file=sys.stderr) print(f'{datetime.datetime.now()} task: {task.name} raise exception.') try: raise task.exception except: l = logger_sd(stream_display=False, file_out=False, mail_send=True) l.critical(f'*-smt-*\n {traceback.format_exc()} \n *-smt-*') print(traceback.format_exc()) raise time.sleep(.1)