# #!/usr/bin/env python # # coding: utf-8 # # https://www.cnblogs.com/scolia/p/6132950.html
https://www.cnblogs.com/madsnotes/articles/5688681.html
import threading import time import signal def set_timeout(num): def wrap(func): def handle(signum, frame): signal.alarm(0) raise RuntimeError def todo(self, *args, **kwargs): try: print(f"in todo self:{self}") signal.signal(signal.SIGALRM, handle) signal.alarm(num) # 設置定時器 print("設置定時器完成") r = func(self, *args, **kwargs) # signal.alarm(0) # signal.alarm(0) # 關閉定時器, 這里不能阻塞 # print("關閉定時器") # return r except RuntimeError: print(f"signal超時,准備停止線程") self.coo.tmp_thread.stop() # 是否真的調用了stop/ print(f"signal超時,停止線程 done") raise Exception return todo return wrap class Job(threading.Thread): def __init__(self, *args, **kwargs): super(Job, self).__init__(*args, **kwargs) self.__running = threading.Event() # 用於停止線程的標識 self.__running.set() # 將running設置為True # self.running = True def run(self): while True: if self.__running.isSet(): print("子線程運行中... ", time.time()) time.sleep(1) else: print("子線程退出....") print("子線程退出....") return print("in while True") def stop(self): print("調用線程stop") time.sleep(1) self.__running.clear() # 設置為False # self.running = False print("調用線程stop done") time.sleep(1) # 線程的停止:注意如果是阻塞的無法完成stop class Coo(): def __init__(self): self.tmp_thread = None def execute(self): t = Job() t.setDaemon(True) t.start() self.tmp_thread = t t.join() # time.sleep(8) # t.stop() class CustErr(Exception): pass class Foo(): def __init__(self): self.coo = Coo() # def handle(self, signum, frame): # signal.alarm(0) # # self.coo.tmp_thread.stop() # raise CustErr @set_timeout(num=3) def foo_main(self): self.coo.execute() # try: # signal.signal(signal.SIGALRM, self.handle) # signal.alarm(3) # print(f"in foo main self:{self}") # self.coo.execute() # except CustErr: # print("catch Cust Err...") # self.coo.tmp_thread.stop() def main(): a = Foo() a.foo_main() from multiprocessing import Process p1=Process(target=main,args=()) p1.start() print("主線程結束退出")