Python 在不依賴第三方庫的前提下,對於定時器的實現並不是很完美,但是這不意味着我們無法實現。
閱讀了網上的一些資料,得出一些結論,順手寫了一個基類的定時器(Python3)
BaseTimer:
1 # -*- coding: utf-8 -*- 2 3 4 from abc import ABCMeta, abstractmethod 5 import time 6 import threading 7 8 class BaseTimer(threading.Thread): 9 """ 10 基礎定時器抽象類,可以隨意繼承本類並且實現exec抽象方法即可定時產生任務 11 """ 12 __metaclass__ = ABCMeta 13 def __init__(self,howtime=1.0,enduring=True): 14 """ 15 howtime 每次定時的時間 16 enduring 是否是一個持久性的任務,用這個可以控制開關 17 """ 18 19 self.enduring = enduring 20 self.howtime = howtime 21 threading.Thread.__init__(self) 22 23 def run(self): 24 time.sleep(self.howtime) #至少執行一次 任務 25 self.exec() 26 while self.enduring: #是否持久,或者是否進行運行 27 time.sleep(self.howtime) 28 self.exec() #每次開始執行任務 29 30 @abstractmethod 31 def exec(self): 32 """抽象方法,子類實現""" 33 pass 34 35 def destroy(self): 36 """銷毀自身""" 37 self.enduring = False 38 del self 39 40 def stop(self): 41 self.enduring = False 42 43 def restart(self): 44 self.enduring = True 45 46 def get_status(self): 47 return self.enduring 48
如何使用?
我們來建立一個新的任務,這個任務是過一段時間就輸出:
1 class TimerMask(BaseTimer): 2 """定時任務類,不同的業務邏輯""" 3 def __init__(self,howtime=1.0,enduring=True): 4 BaseTimer.__init__(self,howtime,enduring) 5 self.ws=0 6 7 def exec(self): 8 print("HelloWorld!") 9 self.ws = self.ws + 1 #這里是過0.8秒輸出一次 10 if self.ws >5: 11 self.destroy()
加入如下:
1 if __name__ == "__main__": 2 Q_w = 0 3 w = TimerMask(howtime=0.8) 4 print("-") 5 w.start() 6 #這里線程輸出這些,做其他事情的 7 while True: 8 time.sleep(0.4) #0.4秒 9 print("- ",Q_w," - WMask:",w) 10 Q_w += 1 11 pass
輸出:

於是你可能會想問,那豈不是每個不同的行為都要寫一個繼承了BaseTimer的類來做事呢,其實不然,你可以寫個函數調用的TimerFunc類:
1 class TimerFunc(BaseTimer): 2 """可傳遞任何函數的定時任務類""" 3 def __init__(self,func,howtime=1.0,enduring=True): 4 BaseTimer.__init__(self,howtime,enduring) 5 self.func = func 6 7 def exec(self): 8 self.func() #調用函數 9 10 #使用方法: 11 def doing(): 12 print("Hello") 13 14 w = TimerFunc(doing) 15 w.start()
輸出:"Hello",並且會每隔1秒執行一次
是不是覺得可以做一堆事情了?你可以自由發揮,繼承BaseTimer類
1 w = TimerFunc(doing,5,False) #這樣就可以定義延遲5秒使用了~ 2 w.start()
在搜索資料的時候,找到了網上大部分的實現方法,其實都差不多,感興趣你可以看看:
1 import threading ,time 2 from time import sleep, ctime 3 class Timer(threading.Thread): 4 """ 5 very simple but useless timer. 6 """ 7 def __init__(self, seconds): 8 self.runTime = seconds 9 threading.Thread.__init__(self) 10 def run(self): 11 time.sleep(self.runTime) 12 print ("Buzzzz!! Time's up!") 13 14 class CountDownTimer(Timer): 15 """ 16 a timer that can counts down the seconds. 17 """ 18 def run(self): 19 counter = self.runTime 20 for sec in range(self.runTime): 21 print (counter) 22 time.sleep(1.0) 23 counter -= 1 24 print ("Done") 25 26 class CountDownExec(CountDownTimer): 27 """ 28 a timer that execute an action at the end of the timer run. 29 """ 30 def __init__(self, seconds, action, args=[]): 31 self.args = args 32 self.action = action 33 CountDownTimer.__init__(self, seconds) 34 def run(self): 35 CountDownTimer.run(self) 36 self.action(self.args) 37 38 def myAction(args=[]): 39 print ("Performing my action with args:") 40 print (args) 41 42 if __name__ == "__main__": 43 t = CountDownExec(3, myAction, ["hello", "world"]) 44 t.start() 45 print("2333")
1 ''' 2 使用sched模塊實現的timer,sched模塊不是循環的,一次調度被執行后就Over了,如果想再執行,可以使用while循環的方式不停的調用該方法 3 Created on 2013-7-31 4 5 @author: Eric 6 ''' 7 import time, sched 8 9 #被調度觸發的函數 10 def event_func(msg): 11 print("Current Time:", time.strftime("%y-%m-%d %H:%M:%S"), 'msg:', msg) 12 13 def run_function(): 14 #初始化sched模塊的scheduler類 15 s = sched.scheduler(time.time, time.sleep) 16 #設置一個調度,因為time.sleep()的時間是一秒,所以timer的間隔時間就是sleep的時間,加上enter的第一個參數 17 s.enter(0, 2, event_func, ("Timer event.",)) 18 s.run() 19 20 def timer1(): 21 while True: 22 #sched模塊不是循環的,一次調度被執行后就Over了,如果想再執行,可以使用while循環的方式不停的調用該方法 23 time.sleep(1) 24 run_function() 25 26 if __name__ == "__main__": 27 timer1()
感謝耐心閱讀,希望對你有幫助。
