Python3.x:定時任務實現方式
Python3.x下實現定時任務的方式有很多種方式。
一、循環sleep:
最簡單的方式,在循環里放入要執行的任務,然后sleep一段時間再執行。缺點是,不容易控制,而且sleep是個阻塞函數
def timer(n):
'''''
每n秒執行一次
'''
while True:
print(time.strftime('%Y-%m-%d %X',time.localtime()))
yourTask() # 此處為要執行的任務
time.sleep(n)
二、threading的Timer:
例如:5秒后執行
def printHello():
print("start" )
Timer(5, printHello).start()
例如:間隔5秒執行一次
def printHello():
print("start" )
timer = threading.Timer(5,printHello)
timer.start()
if __name__ == "__main__":
printHello()
例如:兩種方式組合用,5秒鍾后執行,並且之后間隔5秒執行一次
def printHello():
print("start")
timer = threading.Timer(5,printHello)
timer.start()
if __name__ == "__main__":
timer = threading.Timer(5,printHello)
timer.start()
三、sched模塊:
sched是一種調度(延時處理機制)。
import time
import os
import sched
# 初始化sched模塊的scheduler類
# 第一個參數是一個可以返回時間戳的函數,第二個參數可以在定時未到達之前阻塞。
schedule = sched.scheduler(time.time, time.sleep)
# 被周期性調度觸發的函數
def execute_command(cmd, inc):
print('執行主程序')
'''''
終端上顯示當前計算機的連接情況
'''
os.system(cmd)
schedule.enter(inc, 0, execute_command, (cmd, inc))
def main(cmd, inc=60):
# enter四個參數分別為:間隔事件、優先級(用於同時間到達的兩個事件同時執行時定序)、被調用觸發的函數,
# 給該觸發函數的參數(tuple形式)
schedule.enter(0, 0, execute_command, (cmd, inc))
schedule.run()
# 每60秒查看下網絡連接情況
if __name__ == '__main__':
main("netstat -an", 60)
四、定時框架APScheduler:
APScheduler是基於Quartz的一個Python定時任務框架。提供了基於日期、固定時間間隔以及crontab類型的任務,並且可以持久化任務。
需要先安裝apscheduler庫,cmd窗口命令:pip install apscheduler
簡單的間隔時間調度代碼:
按 Ctrl+C 復制代碼
按 Ctrl+C 復制代碼
五、定時框架Celery:
非常強大的分布式任務調度框架;
需要先安裝Celery庫,cmd窗口命令: pip install Celery
六、定時框架RQ:
基於Redis的作業隊列工具,優先選擇APScheduler定時框架;
七、使用windows的定時任務:
可以將所需要的Python程序打包成exe文件,然后在windows下設置定時執行。
八、Linux的定時任務(Crontab):
在Linux下可以很方便的借助Crontab來設置和運行定時任務。進入Crontab文件編輯頁面,設置時間間隔,使用一些shell命令來運行bash腳本或者是Python腳本,保存后Linux會自動按照設定的時間來定時運行程序。

