Python開發【筆記】:單線程下執行多個定時任務


單線程多定時任務

  前言:公司業務需求,實例當中大量需要啟動定時器的操作;大家都知道python中的定時器用的是threading.Timer,每當啟動一個定時器時,程序內部起了一個線程,定時器觸發執行結束后,線程自動銷毀;這里就涉及到一個問題,如果同時有大量啟動定時器的需求時,內部線程過多,程序肯定就崩了,有沒有啟一個線程就能完成定時器的操作呢?網上查了一些資料,還沒有看到能解決目前問題的現成代碼,不如自己搞一個試試

 

1、初始版本:

思路:定時器,說白了就是延時執行指定的程序,目前自己重構python里面的定時器不太現實,能力達不到,所以延時操作時還得用到系統定時器,不過我們可以改一下規則;把所有要進行定時操作的程序添加到特定列表中,把列表中定時時間最短程序拿出來,進行threading.Timer(time,callback)綁定,等時間超時觸發自定義的callback,執行剛剛列表取出的程序;然后把時間更新,再次把列表中時間最短的程序拿出了,繼續threading.Timer綁定,不斷的迭代循環;當有新的定時任務加入到列表時,把當前的threading.Timer綁定取消,更新列表中的時間,再次取出最短時間,進行threading.Timer綁定......

代碼:

import threading
import time

class Timer():
    '''單線程下的定時器'''

    def __init__(self):
        self.queues = []
        self.timer = None
        self.last_time = time.time()

    def start(self):
        item = self.get()
        if item:
            self.timer = threading.Timer(item[0],self.execute)
            self.timer.start()

    def add(self,item):
        print('add',item)
        self.flush_time()
        self.queues.append(item)
        self.queues.sort(key=lambda x:x[0])

        if self.timer:
            self.timer.cancel()
            self.timer = None
        self.start()

    def get(self):
        item = None
        if len(self.queues) > 0:
            item = self.queues[0]
        return item

    def pop(self):
        item = None
        if len(self.queues) > 0:
            item = self.queues.pop(0)
        return item

    def flush_time(self):
        curr_time = time.time()
        for i in self.queues:
            i[0] = i[0] - (curr_time - self.last_time)
        self.last_time = curr_time

    def execute(self):
        # if self.timer:
        #     self.timer.cancel()
        #     self.timer = None
        item = self.pop()
        self.flush_time()
        if item:
            callback = item[1]
            args = item[0]
            callback(args)
        self.start()

執行及輸出:

if __name__ == '__main__':
    # 檢測線程數
    def func():
        while True:
            print(threading.active_count())
            time.sleep(1)
    
    f1 = threading.Thread(target=func)
    f1.start()
    
    import logging
    logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")
    def func1(*args):
        logging.info('func1 %s'%args)
        # time.sleep(5)
    
    def func2(*args):
        logging.info('func2 %s' % args)
        # time.sleep(5)
    def func3(*args):
        logging.info('func3 %s' % args)
        # time.sleep(5)
    
    def func4(*args):
        logging.info('func4 %s' % args)
        # time.sleep(5)
    
    def func5(*args):
        logging.info('func5 %s' % args)
        # time.sleep(5)
    
    
    # 測試
    t1 = Timer()
    logging.info('start')
    t1.add([5,func1])
    time.sleep(0.5)
    t1.add([4,func2])
    time.sleep(0.5)
    t1.add([3,func3])
    time.sleep(0.5)
    t1.add([2,func4])
    time.sleep(0.5)
    t1.add([1,func5])
    time.sleep(5)
    t1.add([1,func1])
    t1.add([2,func2])
    t1.add([3,func3])
    t1.add([4,func4])
    t1.add([5,func5])
    
    # 輸出
    # 2
    # 07/27/2017 10:36:47 [Thursday] start
    # add [5, <function func1 at 0x000000D79FC77E18>]
    # add [4, <function func2 at 0x000000D79FCA8488>]
    # 3
    # add [3, <function func3 at 0x000000D79FCA8510>]
    # add [2, <function func4 at 0x000000D79FCA8598>]
    # 3
    # add [1, <function func5 at 0x000000D79FCA8620>]
    # 3
    # 07/27/2017 10:36:50 [Thursday] func5 1
    # 07/27/2017 10:36:51 [Thursday] func4 0.498349666595459
    # 3
    # 07/27/2017 10:36:51 [Thursday] func3 0.49782633781433105
    # 07/27/2017 10:36:52 [Thursday] func2 0.49848270416259766
    # 3
    # 07/27/2017 10:36:52 [Thursday] func1 0.48449039459228516
    # 2
    # 2
    # add [1, <function func1 at 0x000000D79FC77E18>]
    # add [2, <function func2 at 0x000000D79FCA8488>]
    # add [3, <function func3 at 0x000000D79FCA8510>]
    # add [4, <function func4 at 0x000000D79FCA8598>]
    # add [5, <function func5 at 0x000000D79FCA8620>]
    # 3
    # 07/27/2017 10:36:55 [Thursday] func1 0.9990766048431396
    # 3
    # 07/27/2017 10:36:56 [Thursday] func2 0.9988017082214355
    # 3
    # 07/27/2017 10:36:57 [Thursday] func3 0.99928879737854
    # 07/27/2017 10:36:58 [Thursday] func4 0.9991350173950195
    # 3
    # 3
    # 07/27/2017 10:36:59 [Thursday] func5 0.9988160133361816
    
執行代碼

注:查看代碼輸出,所有的定時器都按照標定的時間依次執行,非常完美,一切看起來很美好,只是看起來,呵呵噠,當你把func里面的time.sleep(5)啟用后,線程數蹭蹭的上來了;原因是上個定時器callback還是執行中,下個定時器已經啟動了,這時就又新增了一個線程,哎,失敗

 

2、修訂版本

思路:利用生成者消費者模型,用到threading.Condition條件變量;強制永遠啟用的是一個Timer!

代碼:

import time
import threading
import logging

class NewTimer(threading.Thread):
    '''單線程下的定時器'''
    def __init__(self):
        super().__init__()
        self.queues = []
        self.timer = None
        self.cond = threading.Condition()

    def run(self):
        while True:
            # print('NewTimer',self.queues)
            self.cond.acquire()
            item = self.get()
            callback = None
            if not item:
                logging.info('NewTimer wait')
                self.cond.wait()
            elif item[0] <= time.time():
                new_item = self.pop()
                callback = new_item[1]
            else:
                logging.info('NewTimer start sys timer and wait')
                self.timer = threading.Timer(item[0]-time.time(),self.execute)
                self.timer.start()
                self.cond.wait()
            self.cond.release()

            if callback:
                callback(item[0])

    def add(self, item):
        # print('add', item)
        self.cond.acquire()
        item[0] = item[0] + time.time()
        self.queues.append(item)
        self.queues.sort(key=lambda x: x[0])
        logging.info('NewTimer add notify')
        if self.timer:
            self.timer.cancel()
            self.timer = None
        self.cond.notify()
        self.cond.release()

    def pop(self):
        item = None
        if len(self.queues) > 0:
            item = self.queues.pop(0)
        return item

    def get(self):
        item = None
        if len(self.queues) > 0:
            item = self.queues[0]
        return item

    def execute(self):
        logging.info('NewTimer execute notify')
        self.cond.acquire()
        self.cond.notify()
        self.cond.release()

執行及輸出:

if __name__ == '__main__':
    def func():
        while True:
            print(threading.active_count())
            time.sleep(1)

    f1 = threading.Thread(target=func)
    f1.start()
    logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")

    newtimer = NewTimer()
    newtimer.start()

    def func1(*args):
        logging.info('func1 %s'%args)
        time.sleep(5)

    def func2(*args):
        logging.info('func2 %s' % args)
        time.sleep(5)
    def func3(*args):
        logging.info('func3 %s' % args)
        time.sleep(5)

    def func4(*args):
        logging.info('func4 %s' % args)
        time.sleep(5)

    def func5(*args):
        logging.info('func5 %s' % args)
        time.sleep(5)

    newtimer.add([5,func1])
    newtimer.add([4,func2])
    newtimer.add([3,func3])
    newtimer.add([2,func4])
    newtimer.add([1,func5])
    time.sleep(1)
    newtimer.add([1,func1])
    newtimer.add([2,func2])
    newtimer.add([3,func3])
    newtimer.add([4,func4])
    newtimer.add([5,func5])

# 輸出
# 2
# 07/27/2017 11:26:19 [Thursday] NewTimer wait
# 07/27/2017 11:26:19 [Thursday] NewTimer add notify
# 07/27/2017 11:26:19 [Thursday] NewTimer add notify
# 07/27/2017 11:26:19 [Thursday] NewTimer add notify
# 07/27/2017 11:26:19 [Thursday] NewTimer add notify
# 07/27/2017 11:26:19 [Thursday] NewTimer add notify
# 07/27/2017 11:26:19 [Thursday] NewTimer start sys timer and wait
# 07/27/2017 11:26:20 [Thursday] NewTimer execute notify
# 4
# 07/27/2017 11:26:20 [Thursday] func5 1501125980.2175007
# 07/27/2017 11:26:20 [Thursday] NewTimer add notify
# 07/27/2017 11:26:20 [Thursday] NewTimer add notify
# 07/27/2017 11:26:20 [Thursday] NewTimer add notify
# 07/27/2017 11:26:20 [Thursday] NewTimer add notify
# 07/27/2017 11:26:20 [Thursday] NewTimer add notify
# 3
# 3
# 3
# 3
# 3
# 07/27/2017 11:26:25 [Thursday] func4 1501125981.2175007
# 3
# 3
# 3
# 3
# 07/27/2017 11:26:30 [Thursday] func1 1501125981.218279
# 3
# 3
# 3
# 3
# 3
# 3
# 07/27/2017 11:26:35 [Thursday] func3 1501125982.2175007
# 3
# 3
# 3
# 3
# 07/27/2017 11:26:40 [Thursday] func2 1501125982.218279
# 3
# 3
# 3
# 3
# 3
# 07/27/2017 11:26:45 [Thursday] func2 1501125983.2175007
# 3
# 3
# 3
# 3
# 3
# 07/27/2017 11:26:50 [Thursday] func3 1501125983.218279
# 3
# 3
# 3
# 3
# 3
# 07/27/2017 11:26:55 [Thursday] func1 1501125984.2175007
# 3
# 3
# 3
# 3
# 3
# 07/27/2017 11:27:00 [Thursday] func4 1501125984.218279
# 3
# 3
# 3
# 3
# 3
# 07/27/2017 11:27:05 [Thursday] func5 1501125985.218279
# 3
# 3
# 3
# 3
# 3
# 07/27/2017 11:27:10 [Thursday] NewTimer wait
輸出

注:這次無論如何測試線程數也不會蹭蹭的上漲,同時可以實現多定時器任務要求;缺點:用到了兩線程,沒有用到單線程去實現,第二時間精准度問題,需要等待上個定時程序執行完畢,程序才能繼續運行

 

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM