APScheduler的簡單使用


from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import time
import logging

'''
APScheduler提供了七種類型的調度器:
(1)BlockingScheduler : 調度器在當前進程的主線程中執行,會阻塞當前線程
(2)BackgroundScheduler  :  調度器在后台線程中執行, 不會阻塞當前線程
(3)AsyncIOScheduler : 結合AsyncIo 模塊(一個異步框架)一起使用

APScheduler觸發器有3種:
    cron : 功能最強大
    interval: 周期性的執行
    date: 只執行一次

'''

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log.txt',
                    filemode='a')


def aps_test():
    print(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S'), '你好')


def aps_test_name(name):
    """
    帶參定時任務
    :param name:
    :return:
    """
    print(datetime.datetime.now().strftime('%Y-%m-%D %H-%M-%S'), name)


def aps_test_param(number):
    print('aps_test_param invoker....')
    # 如果number == 0 ,這里會報錯,報錯信息會打印到日志文件log.txt中
    # 雖然這兒會報錯,但是定時任務並不會中斷
    print(10 / number)


def job_listener(event):
    """
    APScheduler提供的監控功能
    :param event:
    :return:
    """
    if event.exception:
        # print(dir(event))
        print(event.job_id, '任務執行過程出錯,發個郵件通知運維人員')
    else:
        # print('定時任務正常執行。。。。。')
        pass


scheduler = BlockingScheduler()
# 添加一個定時任務, 使用cron觸發器
aps_test = scheduler.add_job(func=aps_test, trigger='cron', second='*/5')
# print(aps_test)
# 可以用來修改這個定時任務,比如間隔周期啥的
# aps_test.modify()

# 添加一個定時任務, 使用interval觸發器
scheduler.add_job(func=aps_test_name, args=('美女',), trigger='interval', seconds=10, id='id1')

# 添加一個定時任務(只會執行1次),使用date觸發器
scheduler.add_job(func=aps_test_param, args=(10,), trigger='date', run_date='2020-02-11 20:01:01')

# 添加一個定時任務,用來模擬定時任務中報錯的情況
# 每個job默認都有一個job_id, 不過默認的job_id 是一串隨機字符串,沒有可讀性,這兒顯示聲明一個job_id 以便於定位問題job
scheduler.add_job(func=aps_test_param, args=[0], trigger='interval', seconds=6, id='aps_test_param2')

# 添加監控
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

# 日志
scheduler._logger = logging

scheduler.start()

"""
 以上是使用BlockingScheduler這個調度器。
 如果使用BackgroundScheduler這個調度器需要搞個主線程。

"""

 

 

。。。。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM