0x0001
from multiprocessing import Process
from time import sleep
def f(time):
sleep(time)
print(time)
def run_with_limited_time(func, args, kwargs, time):
p = Process(target=func, args=args, kwargs=kwargs)
p.start()
p.join(time)
if p.is_alive():
p.terminate()
return False
return True
if __name__ == '__main__':
print(run_with_limited_time(f, (1.5, ), {}, 2.5)) # True
print(run_with_limited_time(f, (3.5, ), {}, 2.5)) # False
0x0002
from contextlib import contextmanager
import threading
import _thread
class TimeoutException(Exception):
def __init__(self, msg=''):
self.msg = msg
@contextmanager
def time_limit(seconds, msg=''):
timer = threading.Timer(seconds, lambda: _thread.interrupt_main())
timer.start()
try:
yield
except KeyboardInterrupt:
raise TimeoutException("Timed out for operation {}".format(msg))
finally:
# if the action ends in specified time, timer is canceled
timer.cancel()
import time
# ends after 5 seconds
with time_limit(5, 'sleep'):
n=0
for i in range(10):
n=n+1
print(n)
time.sleep(1)
參考