python定時任務模塊APScheduler


一、簡單任務

定義一個函數,然后定義一個scheduler類型,添加一個job,然后執行,就可以了

5秒整倍數,就執行這個函數

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test():
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '你好')


scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, trigger='cron', second='*/5')
scheduler.start()

帶參數的

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test(x):
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('你好',), trigger='cron', second='*/5')
scheduler.start()
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test(x):
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
scheduler.add_job(func=aps_test, args=('一次性任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3)

scheduler.start()

二、日志

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log1.txt',
                    filemode='a')


def aps_test(x):
    print 1/0
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
scheduler._logger = logging
scheduler.start()

三、刪除任務

要求執行一定階段任務以后,刪除某一個循環任務,其他任務照常進行。有如下代碼:

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log1.txt',
                    filemode='a')


def aps_test(x):
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)


def aps_date(x):
    scheduler.remove_job('interval_task')
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
    

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5', id='cron_task')
scheduler.add_job(func=aps_date, args=('一次性任務,刪除循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='date_task')
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task')
scheduler._logger = logging

scheduler.start()

四、停止任務,恢復任務

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log1.txt',
                    filemode='a')


def aps_test(x):
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)


def aps_pause(x):
    scheduler.pause_job('interval_task')
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)


def aps_resume(x):
    scheduler.resume_job('interval_task')
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5', id='cron_task')
scheduler.add_job(func=aps_pause, args=('一次性任務,停止循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='pause_task')
scheduler.add_job(func=aps_resume, args=('一次性任務,恢復循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=24), id='resume_task')
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task')
scheduler._logger = logging

scheduler.start()

五、捕獲錯誤

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log1.txt',
                    filemode='a')


def aps_test(x):
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)


def date_test(x):
    print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
    print (1/0)


def my_listener(event):
    if event.exception:
        print ('任務出錯了!!!!!!')
    else:
        print ('任務照常運行...')

scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=('一定性任務,會出錯',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger = logging

scheduler.start()

 六、定時任務的接口設計

# 定時任務功能
@admin.route('/pause', methods=['POST'])
@user_login_req
def pausetask():  # 停止
    data = request.form.get('task_id')
    job = scheduler.get_job(str(data))
    res = {}
    if job:
        if 'pause' in job.__str__():
            res.update({'status': 1001, 'msg': '已停止'})
        else:
            scheduler.pause_job(str(data))
            res.update({'status': 1000, 'msg': '停止中'})
    else:
        res.update({'status': 1001, 'msg': '未運行'})
    return jsonify(res)


@admin.route('/resume', methods=['POST'])
@user_login_req
def resumetask():  # 恢復
    data = request.form.get('task_id')
    job=scheduler.get_job(str(data))
    res={}
    if job:
        if 'run' in job.__str__():
            res.update({'status':1001,'msg':'已恢復'})
        else:
            scheduler.resume_job(str(data))
            res.update({'status': 1000, 'msg': '恢復中'})
    else:
        res.update({'status':1001,'msg':'未運行'})
    return jsonify(res)


@admin.route('/remove_task', methods=['POST'])
@user_login_req
def remove_task():  # 移除
    data = request.form['task_id']
    job = scheduler.get_job(str(data))
    res = {}
    if not job:
        res.update({'status': 1001, 'msg': '已刪除'})
    else:
        scheduler.remove_job(str(data))
        res.update({'status': 1000, 'msg': '刪除中'})
    return jsonify(res)


@admin.route('/addjob', methods=['POST'])
@user_login_req
def addtask():
    data = request.form.get('task_id')

    job = scheduler.get_job(str(data))
    if job:
        return jsonify({'status': 1001,'msg':'已開啟'})
    if data == '1':
        scheduler.add_job(func=task1, id='1', trigger='cron', day_of_week='0-6', hour=18, minute=19,
                          second=10,
                          replace_existing=True)
        # trigger='cron' 表示是一個定時任務
    else:
        scheduler.add_job(func=task2, id='2', trigger='interval', seconds=10,
                          replace_existing=True)
        # trigger='interval' 表示是一個循環任務,每隔多久執行一次
    return jsonify({'status': 1000,'msg':'運行中'})


def task1():
    print('mession1')
    print(datetime.datetime.now())


def task2():
    print('mession2')
    print(datetime.datetime.now())

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM