Python定時任務框架之Apscheduler 案例分享


  引言

  前面已經講過Celery做定時任務的場景,現在分享另一個框架Apscheduler。Apscheduler的全稱是Advanced Python Scheduler。它是一個輕量級的 Python 定時任務調度框架。同時,它還支持異步執行、后台執行調度任務。本人小小的建議是一般項目用APScheduler,因為不用像Celery那樣再單獨啟動worker、beat進程,而且API也很簡潔。

  需求背景

  前端時間雙十一公司業務暴增的情況下,訂單也是暴增,要在釘釘群定時播報關鍵的業務數據,這個時候需要一個簡潔又快速出結果的方案。於是偷偷用python花了不到半個小時寫了一個不到30行的腳本(包括調試),完成了領導的需求。

  簡介

  Apscheduler的官方文檔可以參考:https://apscheduler.readthedocs.io/en/latest/modules/triggers/cron.html#module-apscheduler.triggers.cron  或:https://apscheduler.readthedocs.io/en/latest/userguide.html#

  Python定時任務框架APScheduler,Advanced Python Scheduler (APScheduler) 是一個輕量級但功能強大的進程內任務調度器,作用為在指定的時間規則執行指定的作業(時間規則:指定的日期時間、固定時間間隔以及類似Linux系統中Crontab的方式);並且該框架可以進行持久化配置,保證在項目重啟或者崩潰恢復后仍然能夠恢復之前的作業繼續運行。

  特點

  1、不依賴於Linux系統的crontab系統定時,獨立運行

  2、可以動態添加新的定時任務,如下單后30分鍾內必須支付,否則取消訂單,就可以借助此工具(每下一單就要添加此訂單的定時任務)

  3、對添加的定時任務可以做持久保存

  四大組件

  觸發器(triggers):觸發器包含調度邏輯,描述一個任務何時被觸發,按日期或按時間間隔或按 cronjob 表達式三種方式觸發。每個作業都有它自己的觸發器,除了初始配置之外,觸發器是完全無狀態的。

  作業存儲器(job stores):作業存儲器指定了作業被存放的位置,默認情況下作業保存在內存,也可將作業保存在各種數據庫中,當作業被存放在數據庫中時,它會被序列化,當被重新加載時會反序列化。作業存儲器充當保存、加載、更新和查找作業的中間商。在調度器之間不能共享作業存儲。

  執行器(executors):執行器是將指定的作業(調用函數)提交到線程池或進程池中運行,當任務完成時,執行器通知調度器觸發相應的事件。

  調度器(schedulers):任務調度器,屬於控制角色,通過它配置作業存儲器、執行器和觸發器,添加、修改和刪除任務。調度器協調觸發器、作業存儲器、執行器的運行,通常只有一個調度程序運行在應用程序中,開發人員通常不需要直接處理作業存儲器、執行器或觸發器,配置作業存儲器和執行器是通過調度器來完成的。

  重要組件說明

  觸發器(triggers)——目前APScheduler支持觸發器:

DateTrigger
IntervalTrigger
CronTrigger

  DateTrigger: 指定日期時間執行一次

  IntervalTrigger: 固定時間間隔執行,支持每秒、每分、每時、每天、每周

  CronTrigger: 類似Linux系統的Crontab定時任務

  DateTrigger和IntervalTrigger很好理解,使用也比較簡單,這里重點說一下CronTrigger觸發器。

  CronTrigger觸發器的參數選項如下:

  CronTrigger可用的表達式:

  執行器(executors)——目前APScheduler支持的Executor:

AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor

  作業存儲器(job stores)——目前APScheduler支持的Jobstore:

MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore

  調度器(schedulers)——目前APScheduler支持的Scheduler:

AsyncIOScheduler
BackgroundScheduler --非阻塞方式
BlockingScheduler   --阻塞方式
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler

  Job作業——Job作為APScheduler最小執行單位。創建Job時指定執行的函數,函數中所需參數,Job執行時的一些設置信息。

id:指定作業的唯一ID

name:指定作業的名字

trigger:apscheduler定義的觸發器,用於確定Job的執行時間,根據設置的trigger規則,計算得到下次執行此job的
時間, 滿足時將會執行

executor:apscheduler定義的執行器,job創建時設置執行器的名字,根據字符串你名字到scheduler獲取到執行此
job的 執行器,執行job指定的函數

max_instances:執行此job的最大實例數,executor執行job時,根據job的id來計算執行次數,根據設置的最大實例數
來確定是否可執行

next_run_time:Job下次的執行時間,創建Job時可以指定一個時間[datetime],不指定的話則默認根據trigger獲取觸
發時間

misfire_grace_time:Job的延遲執行時間,例如Job的計划執行時間是21:00:00,但因服務重啟或其他原因導致
21:00:31才執行,如果設置此key為40,則該job會繼續執行,否則將會丟棄此job

coalesce:Job是否合並執行,是一個bool值。例如scheduler停止20s后重啟啟動,而job的觸發器設置為5s執行
一次,因此此job錯過了4個執行時間,如果設置為是,則會合並到一次執行,否則會逐個執行

func:Job執行的函數

args:Job執行函數需要的位置參數

kwargs:Job執行函數需要的關鍵字參數

  創建步驟

  基本分為四個步驟:創建調度器→添加調度任務/觸發器(滿足條件)→執行器

# 1.創建調度器
# 后台執行  此處程序不會發生阻塞
scheduler = BackgroundScheduler()

# 2.添加調度任務
# 3.觸發器triggers='interval'
# 每隔20秒執行一次
scheduler.add_job(main, 'interval', seconds=20)

# 4.滿足條件執行器
scheduler.start()

  觸發器 Trigger使用三種場景

  date——定時調度(在特定的時間日期執行,作業只會執行一次)

from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler

sched = BlockingScheduler()
def my_job():
	print(1)
# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

  interval——間隔調度(每隔多久執行一次)

from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BlockingScheduler()
    # sep1 每隔3秒執行一次
    scheduler.add_job(tick, 'interval', seconds=3)
    # sep2 表示每隔3天17時19分07秒執行一次任務
    scheduler.add_job(tick, 'interval', days=3, hours=17, minutes=19, seconds=7)
    # 每20秒執行一次
    scheduler.add_job(tick, 'interval', seconds=61)

    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C    '))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

   cron——某一定時時間執行(按指定的周期執行):

from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BlockingScheduler()
    # 表示每天的19:23 分執行任務
    scheduler.add_job(tick, 'cron', hour=19,minute=23)
    # 每天8點整執行
    scheduler.add_job(tick, 'cron', day_of_week='0-6', hour=8, minute=00, second=00)
    # 每天0點,1點,8點執行
    scheduler.add_job(tick,'cron',month='*', day='*', hour='0,1,8',minute='00')
    
    # 表示2017年3月22日17時19分07秒執行該程序
    scheduler.add_job(tick, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7)

    # 表示任務在6,7,8,11,12月份的第三個星期五的00:00,01:00,02:00,03:00 執行該程序
    scheduler.add_job(tick, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

    # 表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00
    scheduler.add_job(tick, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

    # 表示每5秒執行該程序一次,相當於interval 間隔調度中seconds = 5
    scheduler.add_job(tick, 'cron', second='*/5')


    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C    '))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

  

  完整Demo

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time    : 2020/11/17 0017 22:16
# @Author  : liudinglong
# @File    : send_demo.py
# @Description: 
# @Question: 
'''

from datetime import datetime
import json
import urllib.request
import pymysql as pms
from apscheduler.schedulers.background import BackgroundScheduler,BlockingScheduler
import os
# Mac下關閉ssl驗證用到以下模塊
import ssl

'''
----------------------------------------------
# 需要CMD命令下安裝以下支持庫:
# pip install apscheduler
# pip install pymysql
----------------------------------------------
'''
# Mac和Linux下關閉ssl驗證,不然會報錯
ssl._create_default_https_context = ssl._create_unverified_context

# 你的釘釘機器人url
global myurl
my_url = "https://oapi.dingtalk.com/robot/send?access_token=XXXXXXXXXXXX"


def send_request(url, datas):
    # 傳入url和內容發送請求
    # 構建一下請求頭部
    header = {
        "Content-Type": "application/json",
        "Charset": "UTF-8"
    }
    sendData = json.dumps(datas)  # 將字典類型數據轉化為json格式
    sendDatas = sendData.encode("utf-8")  # python3的Request要求data為byte類型
    # 發送請求
    request = urllib.request.Request(url=url, data=sendDatas, headers=header)
    # 將請求發回的數據構建成為文件格式
    opener = urllib.request.urlopen(request)
    # 打印返回的結果
    print(opener.read())


def get_mysqldatas(sql):
    # 一個傳入sql導出數據的函數,實例為MySQL需要先安裝pymysql庫,cmd窗口命令:pip install pymysql
    # 跟數據庫建立連接
    conn = pms.connect(host='服務器地址', user='用戶名', passwd='密碼', database='數據庫', port=3306, charset="utf8")
    # 使用 cursor() 方法創建一個游標對象
    cur = conn.cursor()
    # 使用 execute() 方法執行 SQL
    cur.execute(sql)

    # 獲取所需要的數據
    datas = cur.fetchall()

    # 關閉連接
    cur.close()
    # 返回所需的數據
    return datas


def get_ddmodel_datas(type):
    # 返回釘釘模型數據,1:文本;2:markdown所有人;3:markdown帶圖片,@接收人;4:link類型
    if type == 1:
        my_data = {
            "msgtype": "text",
            "text": {
                "content": " "
            },
            "at": {
                "atMobiles": [
                    "188XXXXXXX"
                ],
                "isAtAll": False
            }
        }
    elif type == 2:
        my_data = {
            "msgtype": "markdown",
            "markdown": {"title": " ",
                         "text": " "
                         },
            "at": {
                "isAtAll": True
            }
        }
    elif type == 3:
        my_data = {
            "msgtype": "markdown",
            "markdown": {"title": " ",
                         "text": " "
                         },
            "at": {
                "atMobiles": [
                    "188XXXXXXXX"
                ],
                "isAtAll": False
            }
        }
    elif type == 4:
        my_data = {
            "msgtype": "link",
            "link": {
                "text": " ",
                "title": " ",
                "picUrl": "",
                "messageUrl": " "
            }
        }
    return my_data


def main():
    print('Main! The time is: %s' % datetime.now())
    # 按照釘釘給的數據格式設計請求內容 鏈接https://open-doc.dingtalk.com/docs/doc.htm?spm=a219a.7629140.0.0.p7hJKp&treeId=257&articleId=105735&docType=1
    # 調用釘釘機器人全局變量myurl
    global myurl

    # 1.Text類型群發消息
    # 合並標題和數據
    My_content = "hello,  @188XXXXXXXX 這是一個測試消息"
    my_data = get_ddmodel_datas(1)
    # 把文本內容寫入請求格式中
    my_data["text"]["content"] = My_content
    send_request(my_url, my_data)

    # 2.Markdown類型群發消息(MySQL查詢結果發送)
    # 獲取sql數據
    sql = "SELECT branch_no,count(*) from wzy_customer_user group by branch_no order by branch_no"
    my_mydata = get_mysqldatas(sql)
    str1 = '\t\n\r'
    seq = []
    for i in range(len(my_mydata)):
        seq.append(str(my_mydata[i]))
    data = str1.join(seq)
    data = data.replace('\'', '')
    data = data.replace('(', '')
    data = data.replace(')', '')
    data = data.replace(',', '\t')
    print(data)

    Mytitle = "#### XXX報表\r\n單位\t數量\t\n\r %s"
    my_Mytitle = Mytitle.join('\t\n') % data
    my_data = get_ddmodel_datas(2)
    my_data["markdown"]["title"] = "XXXX 通報"
    my_data["markdown"]["text"] = my_Mytitle
    send_request(my_url, my_data)

    # 3.Markdown(帶圖片@對象)
    my_data = get_ddmodel_datas(3)
    my_data["markdown"]["title"] = "系統預警"
    my_data["markdown"][
        "text"] = "#### 系統預警內容  \n > @188XXXXXXXX \n\n > ![screenshot](http://i01.lw.aliimg.com/media/lALPBbCc1ZhJGIvNAkzNBLA_1200_588.png)\n  > ###### 20點00分發布 [詳情](http://www.baidu.cn/)"
    send_request(my_url, my_data)
    # 字體顏色:<font color='#FF4500' size=2>%s</font> 雙\n\n表示換行
    # 4.Link類型群發消息
    my_data = get_ddmodel_datas(4)
    my_data["link"]["text"] = "群機器人是釘釘群的高級擴展功能。群機器人可以將第三方服務的信息聚合到群聊中,實現自動化的信息同步。 "
    my_data["link"]["title"] = "自定義機器人協議"
    my_data["link"][
        "messageUrl"] = "https://open-doc.dingtalk.com/docs/doc.htm?spm=a219a.7629140.0.0.Rqyvqo&treeId=257&articleId=105735&docType=1"
    send_request(my_url, my_data)


if __name__ == "__main__":
    # 定時執行任務,需要先安裝apscheduler庫,cmd窗口命令:pip install apscheduler
    # 隨腳本執行
    # 1.創建調度器
    # scheduler = BlockingScheduler() --阻塞方式
    # 后台執行  此處程序不會發生阻塞
    scheduler = BackgroundScheduler()

    # 2.添加調度任務
    # 3.觸發器triggers='interval'
    # 每隔20秒執行一次
    scheduler.add_job(main, 'interval', seconds=20)
    '''
    ***定時執行示例***
    #固定時間執行一次
    #sched.add_job(main, 'cron', year=2018, month=9, day=28, hour=15, minute=40, second=30)
    #表示2017年3月22日17時19分07秒執行該程序
    scheduler.add_job(my_job, 'cron', year=2017,month = 03,day = 22,hour = 17,minute = 19,second = 07)

    #表示任務在6,7,8,11,12月份的第三個星期五的00:00,01:00,02:00,03:00 執行該程序
    scheduler.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

    #表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00
    scheduler.add_job(my_job(), 'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2014-05-30')

    #表示每5秒執行該程序一次,相當於interval 間隔調度中seconds = 5
    scheduler.add_job(my_job, 'cron',second = '*/5')
    '''
    # 4.滿足條件執行器
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    try:
        # 其他任務是獨立的線程執行
        while True:
            pass
            # time.sleep(60)
            # print('進程正在執行!')
    except (KeyboardInterrupt, SystemExit):
        # 終止任務
        scheduler.shutdown()
        print('Exit The Job!')

  使用案例——釘釘群定時播報消息

  1、在釘釘群助手中,自定義一個機器人,如圖:

 

   代碼設置10秒發送一次,具體如下:

scheduler.add_job(main,'interval',seconds=10)

  運行結果:

 

截圖如下:

 

 

 

  腳本部署

  定時任務的腳本在一定時期內是需要持久使用,如果用IDE跑肯定不方面,於是將它弄到服務器上。

  先把腳本上傳到服務器上,然后按照相關的庫,最后就是啟動,在Linux啟動方式如下:

linux命令運行py腳本:nohup python -u test.py > out.log 2>&1 &

  

 

   日志:

  

 

   這里需要注意的是,參數使用-u的意義:

python的輸出有緩沖,導致out.log並不能夠馬上看到輸出。
-u 參數,使得python不啟用緩沖。

  nohup就是不掛起的意思( no hang up)。該命令的一般形式為:nohup ./test & 

  末尾加個&是指在后台運行,不會因為終端關閉或斷開連接而終止程序。

  具體可以參考:https://www.runoob.com/linux/linux-comm-nohup.html

  這樣就啟動了一個py服務。

 

  總結

  對定時任務框架Apscheduler的簡單使用到此。在工作中遇到其他需要,可以進一步了解,學習是為了解決問題,為了更好的工作。同時,歡迎小伙伴進去溝通交流測試心得與工作方法。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM