Python 3.X 實現定時器 Timer,制作抽象的Timer定時器基類


Python 在不依賴第三方庫的前提下,對於定時器的實現並不是很完美,但是這不意味着我們無法實現。

閱讀了網上的一些資料,得出一些結論,順手寫了一個基類的定時器(Python3)

 

BaseTimer:

 1 # -*- coding: utf-8 -*-
 2 
 3 
 4 from abc import ABCMeta, abstractmethod
 5 import time  
 6 import threading  
 7 
 8 class BaseTimer(threading.Thread):
 9     """
10     基礎定時器抽象類,可以隨意繼承本類並且實現exec抽象方法即可定時產生任務
11     """
12     __metaclass__ = ABCMeta
13     def __init__(self,howtime=1.0,enduring=True):
14         """
15         howtime 每次定時的時間
16         enduring 是否是一個持久性的任務,用這個可以控制開關
17         """
18 
19         self.enduring = enduring
20         self.howtime = howtime
21         threading.Thread.__init__(self)
22     
23     def run(self):
24         time.sleep(self.howtime)  #至少執行一次 任務
25         self.exec()
26         while self.enduring:    #是否持久,或者是否進行運行
27             time.sleep(self.howtime)
28             self.exec()    #每次開始執行任務
29 
30     @abstractmethod
31     def exec(self):    
32         """抽象方法,子類實現"""
33         pass
34 
35     def destroy(self):
36         """銷毀自身"""
37         self.enduring = False
38         del self
39 
40     def stop(self):
41         self.enduring = False
42 
43     def restart(self):
44         self.enduring = True
45         
46     def get_status(self):
47         return self.enduring
48         

如何使用?

 


 

我們來建立一個新的任務,這個任務是過一段時間就輸出:

 1 class TimerMask(BaseTimer):
 2     """定時任務類,不同的業務邏輯"""
 3     def __init__(self,howtime=1.0,enduring=True):
 4         BaseTimer.__init__(self,howtime,enduring)
 5         self.ws=0
 6 
 7     def exec(self):
 8         print("HelloWorld!")
 9         self.ws = self.ws + 1  #這里是過0.8秒輸出一次
10         if self.ws >5:
11             self.destroy()

加入如下:

 1 if __name__ == "__main__":
 2     Q_w = 0
 3     w = TimerMask(howtime=0.8)
 4     print("-")
 5     w.start() 
 6     #這里線程輸出這些,做其他事情的
 7     while True:
 8         time.sleep(0.4) #0.4秒  9         print("- ",Q_w," - WMask:",w)  
10         Q_w += 1
11         pass

輸出:


 

 

於是你可能會想問,那豈不是每個不同的行為都要寫一個繼承了BaseTimer的類來做事呢,其實不然,你可以寫個函數調用的TimerFunc類:

 1 class TimerFunc(BaseTimer):
 2     """可傳遞任何函數的定時任務類"""
 3     def __init__(self,func,howtime=1.0,enduring=True):
 4         BaseTimer.__init__(self,howtime,enduring)
 5         self.func = func
 6 
 7     def exec(self):
 8         self.func() #調用函數
 9 
10 #使用方法:
11 def doing():
12     print("Hello")
13 
14 w = TimerFunc(doing)
15 w.start()

輸出:"Hello",並且會每隔1秒執行一次

是不是覺得可以做一堆事情了?你可以自由發揮,繼承BaseTimer類

1 w = TimerFunc(doing,5,False) #這樣就可以定義延遲5秒使用了~
2 w.start()

 

 

在搜索資料的時候,找到了網上大部分的實現方法,其實都差不多,感興趣你可以看看:

 1 import threading ,time
 2 from time import sleep, ctime
 3 class Timer(threading.Thread):
 4         """
 5         very simple but useless timer.
 6         """
 7         def __init__(self, seconds):
 8                 self.runTime = seconds
 9                 threading.Thread.__init__(self)
10         def run(self):
11                 time.sleep(self.runTime)
12                 print ("Buzzzz!! Time's up!")
13  
14 class CountDownTimer(Timer):
15         """
16         a timer that can counts down the seconds.
17         """
18         def run(self):
19                 counter = self.runTime
20                 for sec in range(self.runTime):
21                         print (counter)
22                         time.sleep(1.0)
23                         counter -= 1
24                 print ("Done")
25  
26 class CountDownExec(CountDownTimer):
27         """
28         a timer that execute an action at the end of the timer run.
29         """
30         def __init__(self, seconds, action, args=[]):
31                 self.args = args
32                 self.action = action
33                 CountDownTimer.__init__(self, seconds)
34         def run(self):
35                 CountDownTimer.run(self)
36                 self.action(self.args)
37  
38 def myAction(args=[]):
39         print ("Performing my action with args:")
40         print (args)
41  
42 if __name__ == "__main__":
43         t = CountDownExec(3, myAction, ["hello", "world"])
44         t.start()
45         print("2333")
View Code
 1 '''
 2 使用sched模塊實現的timer,sched模塊不是循環的,一次調度被執行后就Over了,如果想再執行,可以使用while循環的方式不停的調用該方法
 3 Created on 2013-7-31
 4 
 5 @author: Eric
 6 '''
 7 import time, sched
 8 
 9 #被調度觸發的函數
10 def event_func(msg):
11     print("Current Time:", time.strftime("%y-%m-%d %H:%M:%S"), 'msg:', msg)
12 
13 def run_function():
14     #初始化sched模塊的scheduler類
15     s = sched.scheduler(time.time, time.sleep)
16     #設置一個調度,因為time.sleep()的時間是一秒,所以timer的間隔時間就是sleep的時間,加上enter的第一個參數
17     s.enter(0, 2, event_func, ("Timer event.",))
18     s.run()
19 
20 def timer1():
21     while True:
22         #sched模塊不是循環的,一次調度被執行后就Over了,如果想再執行,可以使用while循環的方式不停的調用該方法
23         time.sleep(1)
24         run_function()
25 
26 if __name__ == "__main__":
27     timer1()
View Code

 

 


 

感謝耐心閱讀,希望對你有幫助。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM