# #!/usr/bin/env python
# # coding: utf-8
# # https://www.cnblogs.com/scolia/p/6132950.html
https://www.cnblogs.com/madsnotes/articles/5688681.html
import threading
import time
import signal
def set_timeout(num):
def wrap(func):
def handle(signum, frame):
signal.alarm(0)
raise RuntimeError
def todo(self, *args, **kwargs):
try:
print(f"in todo self:{self}")
signal.signal(signal.SIGALRM, handle)
signal.alarm(num) # 設置定時器
print("設置定時器完成")
r = func(self, *args, **kwargs)
# signal.alarm(0) # signal.alarm(0) # 關閉定時器, 這里不能阻塞
# print("關閉定時器")
# return r
except RuntimeError:
print(f"signal超時,准備停止線程")
self.coo.tmp_thread.stop() # 是否真的調用了stop/
print(f"signal超時,停止線程 done")
raise Exception
return todo
return wrap
class Job(threading.Thread):
def __init__(self, *args, **kwargs):
super(Job, self).__init__(*args, **kwargs)
self.__running = threading.Event() # 用於停止線程的標識
self.__running.set() # 將running設置為True
# self.running = True
def run(self):
while True:
if self.__running.isSet():
print("子線程運行中... ", time.time())
time.sleep(1)
else:
print("子線程退出....")
print("子線程退出....")
return
print("in while True")
def stop(self):
print("調用線程stop")
time.sleep(1)
self.__running.clear() # 設置為False
# self.running = False
print("調用線程stop done")
time.sleep(1)
# 線程的停止:注意如果是阻塞的無法完成stop
class Coo():
def __init__(self):
self.tmp_thread = None
def execute(self):
t = Job()
t.setDaemon(True)
t.start()
self.tmp_thread = t
t.join()
# time.sleep(8)
# t.stop()
class CustErr(Exception):
pass
class Foo():
def __init__(self):
self.coo = Coo()
# def handle(self, signum, frame):
# signal.alarm(0)
# # self.coo.tmp_thread.stop()
# raise CustErr
@set_timeout(num=3)
def foo_main(self):
self.coo.execute()
# try:
# signal.signal(signal.SIGALRM, self.handle)
# signal.alarm(3)
# print(f"in foo main self:{self}")
# self.coo.execute()
# except CustErr:
# print("catch Cust Err...")
# self.coo.tmp_thread.stop()
def main():
a = Foo()
a.foo_main()
from multiprocessing import Process
p1=Process(target=main,args=())
p1.start()
print("主線程結束退出")