APScheduler定時任務


參考

https://www.cnblogs.com/huchong/p/9088611.html
https://cloud.tencent.com/developer/article/1172218
https://www.cnblogs.com/huchong/p/9088611.html


目錄

1. 安裝
2. 簡單使用
3. 日志
4. 刪除任務
5. 停止恢復任務
6. 意外


1. 安裝

pip install apscheduler

2. 簡單使用

  1. 5s執行一次aps_test方法
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test():
    print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '你好'


scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, trigger='cron', second='*/5')
scheduler.start()

輸出為

2020-05-08 11:02:50 你好
2020-05-08 11:02:55 你好
2020-05-08 11:03:00 你好

apscheduler分為4個模塊,
分別是 Triggers,Job stores,Executors,Schedulers.
從上面的例子我們就可以看出來了,triggers就是觸發器

  1. 幾種定時
  • date表示具體的一次性任務
  • interval表示循環任務
  • cron表示定時任務
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test(x):
    print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
scheduler.add_job(func=aps_test, args=('一次性任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3)

scheduler.start()

輸出如下

2020-05-08 11:01:24 循環任務
2020-05-08 11:01:25 定時任務
2020-05-08 11:01:27 循環任務
2020-05-08 11:01:30 定時任務
2020-05-08 11:01:30 循環任務
2020-05-08 11:01:33 一次性任務
2020-05-08 11:01:33 循環任務
2020-05-08 11:01:35 定時任務
2020-05-08 11:01:36 循環任務
2020-05-08 11:01:39 循環任務
2020-05-08 11:01:40 定時任務
...

3. 日志

增加日志配置

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log1.txt',
                    filemode='a')


def aps_test(x):
    print 1/0
    print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
scheduler._logger = logging
scheduler.start()

打印了定時任務遇到的錯誤

2020-05-08 11:14:52 base.py[line:440] INFO Adding job tentatively -- it will be properly scheduled when the scheduler starts
2020-05-08 11:14:52 base.py[line:881] INFO Added job "aps_test" to job store "default"
2020-05-08 11:14:52 base.py[line:166] INFO Scheduler started
2020-05-08 11:14:55 base.py[line:123] INFO Running job "aps_test (trigger: cron[second='*/5'], next run at: 2020-05-08 11:14:55 CST)" (scheduled at 2020-05-08 11:14:55+08:00)
2020-05-08 11:14:55 base.py[line:131] ERROR Job "aps_test (trigger: cron[second='*/5'], next run at: 2020-05-08 11:15:00 CST)" raised an exception
Traceback (most recent call last):
  File "D:\Python\Language\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
    retval = job.func(*job.args, **job.kwargs)
  File "D:/DEV/workspace/V4082/workorder-data/scheduler.py", line 14, in aps_test
    print(1/0)
ZeroDivisionError: division by zero
2020-05-08 11:15:00 base.py[line:123] INFO Running job "aps_test (trigger: cron[second='*/5'], next run at: 2020-05-08 11:15:00 CST)" (scheduled at 2020-05-08 11:15:00+08:00)
2020-05-08 11:15:00 base.py[line:131] ERROR Job "aps_test (trigger: cron[second='*/5'], next run at: 2020-05-08 11:15:05 CST)" raised an exception
Traceback (most recent call last):
  File "D:\Python\Language\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
    retval = job.func(*job.args, **job.kwargs)
  File "D:/DEV/workspace/V4082/workorder-data/scheduler.py", line 14, in aps_test
    print(1/0)
ZeroDivisionError: division by zero

4. 刪除任務

可以根據id刪除任務

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log1.txt',
                    filemode='a')


def aps_test(x):
    print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x


def aps_date(x):
    scheduler.remove_job('interval_task')
    print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
    

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5', id='cron_task')
scheduler.add_job(func=aps_date, args=('一次性任務,刪除循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='date_task')
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task')
scheduler._logger = logging

scheduler.start()
2020-05-08 11:25:19 循環任務
2020-05-08 11:25:20 定時任務
2020-05-08 11:25:22 循環任務
2020-05-08 11:25:25 定時任務
2020-05-08 11:25:25 循環任務
2020-05-08 11:25:28 循環任務
2020-05-08 11:25:28 一次性任務,刪除循環任務
2020-05-08 11:25:30 定時任務
2020-05-08 11:25:35 定時任務
2020-05-08 11:25:40 定時任務
2020-05-08 11:25:45 定時任務
2020-05-08 11:25:50 定時任務
2020-05-08 11:25:55 定時任務

5. 停止恢復任務

看看官方文檔,還有pause_job,resume_job,用法跟remove_job一樣。

6. 意外

任何代碼都可能發生意外,關鍵是,發生意外了,如何第一時間知道。

可以添加一個監聽器

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log1.txt',
                    filemode='a')


def aps_test(x):
    print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x


def date_test(x):
    print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x
    print 1/0


def my_listener(event):
    if event.exception:
        print '任務出錯了!!!!!!'
        # 發送郵件通知
    else:
        print '任務照常運行...'

scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=('一定性任務,會出錯',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger = logging

scheduler.start()
2020-05-08 11:43:27 循環任務
任務照常運行...
2020-05-08 11:43:30 循環任務
任務照常運行...
2020-05-08 11:43:33 循環任務
任務照常運行...
2020-05-08 11:43:36 循環任務
任務照常運行...
2020-05-08 11:43:39 一定性任務,會出錯
任務出錯了!!!!!!
2020-05-08 11:43:39 循環任務
任務照常運行...
2020-05-08 11:43:42 循環任務
任務照常運行...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM