引言
前面已經講過Celery做定時任務的場景,現在分享另一個框架Apscheduler。Apscheduler的全稱是Advanced Python Scheduler。它是一個輕量級的 Python 定時任務調度框架。同時,它還支持異步執行、后台執行調度任務。本人小小的建議是一般項目用APScheduler,因為不用像Celery那樣再單獨啟動worker、beat進程,而且API也很簡潔。
需求背景
前端時間雙十一公司業務暴增的情況下,訂單也是暴增,要在釘釘群定時播報關鍵的業務數據,這個時候需要一個簡潔又快速出結果的方案。於是偷偷用python花了不到半個小時寫了一個不到30行的腳本(包括調試),完成了領導的需求。
簡介
Apscheduler的官方文檔可以參考:https://apscheduler.readthedocs.io/en/latest/modules/triggers/cron.html#module-apscheduler.triggers.cron 或:https://apscheduler.readthedocs.io/en/latest/userguide.html#
Python定時任務框架APScheduler,Advanced Python Scheduler (APScheduler) 是一個輕量級但功能強大的進程內任務調度器,作用為在指定的時間規則執行指定的作業(時間規則:指定的日期時間、固定時間間隔以及類似Linux系統中Crontab的方式);並且該框架可以進行持久化配置,保證在項目重啟或者崩潰恢復后仍然能夠恢復之前的作業繼續運行。
特點
1、不依賴於Linux系統的crontab系統定時,獨立運行
2、可以動態添加新的定時任務,如下單后30分鍾內必須支付,否則取消訂單,就可以借助此工具(每下一單就要添加此訂單的定時任務)
3、對添加的定時任務可以做持久保存
四大組件
觸發器(triggers):觸發器包含調度邏輯,描述一個任務何時被觸發,按日期或按時間間隔或按 cronjob 表達式三種方式觸發。每個作業都有它自己的觸發器,除了初始配置之外,觸發器是完全無狀態的。
作業存儲器(job stores):作業存儲器指定了作業被存放的位置,默認情況下作業保存在內存,也可將作業保存在各種數據庫中,當作業被存放在數據庫中時,它會被序列化,當被重新加載時會反序列化。作業存儲器充當保存、加載、更新和查找作業的中間商。在調度器之間不能共享作業存儲。
執行器(executors):執行器是將指定的作業(調用函數)提交到線程池或進程池中運行,當任務完成時,執行器通知調度器觸發相應的事件。
調度器(schedulers):任務調度器,屬於控制角色,通過它配置作業存儲器、執行器和觸發器,添加、修改和刪除任務。調度器協調觸發器、作業存儲器、執行器的運行,通常只有一個調度程序運行在應用程序中,開發人員通常不需要直接處理作業存儲器、執行器或觸發器,配置作業存儲器和執行器是通過調度器來完成的。
重要組件說明
觸發器(triggers)——目前APScheduler支持觸發器:
DateTrigger IntervalTrigger CronTrigger
DateTrigger: 指定日期時間執行一次
IntervalTrigger: 固定時間間隔執行,支持每秒、每分、每時、每天、每周
CronTrigger: 類似Linux系統的Crontab定時任務
DateTrigger和IntervalTrigger很好理解,使用也比較簡單,這里重點說一下CronTrigger觸發器。
CronTrigger觸發器的參數選項如下:
CronTrigger可用的表達式:
執行器(executors)——目前APScheduler支持的Executor:
AsyncIOExecutor GeventExecutor ThreadPoolExecutor ProcessPoolExecutor TornadoExecutor TwistedExecutor
作業存儲器(job stores)——目前APScheduler支持的Jobstore:
MemoryJobStore MongoDBJobStore RedisJobStore RethinkDBJobStore SQLAlchemyJobStore ZooKeeperJobStore
調度器(schedulers)——目前APScheduler支持的Scheduler:
AsyncIOScheduler BackgroundScheduler --非阻塞方式 BlockingScheduler --阻塞方式 GeventScheduler QtScheduler TornadoScheduler TwistedScheduler
Job作業——Job作為APScheduler最小執行單位。創建Job時指定執行的函數,函數中所需參數,Job執行時的一些設置信息。
id:指定作業的唯一ID name:指定作業的名字 trigger:apscheduler定義的觸發器,用於確定Job的執行時間,根據設置的trigger規則,計算得到下次執行此job的 時間, 滿足時將會執行 executor:apscheduler定義的執行器,job創建時設置執行器的名字,根據字符串你名字到scheduler獲取到執行此 job的 執行器,執行job指定的函數 max_instances:執行此job的最大實例數,executor執行job時,根據job的id來計算執行次數,根據設置的最大實例數 來確定是否可執行 next_run_time:Job下次的執行時間,創建Job時可以指定一個時間[datetime],不指定的話則默認根據trigger獲取觸 發時間 misfire_grace_time:Job的延遲執行時間,例如Job的計划執行時間是21:00:00,但因服務重啟或其他原因導致 21:00:31才執行,如果設置此key為40,則該job會繼續執行,否則將會丟棄此job coalesce:Job是否合並執行,是一個bool值。例如scheduler停止20s后重啟啟動,而job的觸發器設置為5s執行 一次,因此此job錯過了4個執行時間,如果設置為是,則會合並到一次執行,否則會逐個執行 func:Job執行的函數 args:Job執行函數需要的位置參數 kwargs:Job執行函數需要的關鍵字參數
創建步驟
基本分為四個步驟:創建調度器→添加調度任務/觸發器(滿足條件)→執行器
# 1.創建調度器 # 后台執行 此處程序不會發生阻塞 scheduler = BackgroundScheduler() # 2.添加調度任務 # 3.觸發器triggers='interval' # 每隔20秒執行一次 scheduler.add_job(main, 'interval', seconds=20) # 4.滿足條件執行器 scheduler.start()
觸發器 Trigger使用三種場景
date——定時調度(在特定的時間日期執行,作業只會執行一次)
from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler sched = BlockingScheduler() def my_job(): print(1) # The job will be executed on November 6th, 2009 sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) # The job will be executed on November 6th, 2009 at 16:30:05 sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
interval——間隔調度(每隔多久執行一次)
from datetime import datetime import os from apscheduler.schedulers.blocking import BlockingScheduler def tick(): print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': scheduler = BlockingScheduler() # sep1 每隔3秒執行一次 scheduler.add_job(tick, 'interval', seconds=3) # sep2 表示每隔3天17時19分07秒執行一次任務 scheduler.add_job(tick, 'interval', days=3, hours=17, minutes=19, seconds=7) # 每20秒執行一次 scheduler.add_job(tick, 'interval', seconds=61) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C ')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass
cron——某一定時時間執行(按指定的周期執行):
from datetime import datetime import os from apscheduler.schedulers.blocking import BlockingScheduler def tick(): print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': scheduler = BlockingScheduler() # 表示每天的19:23 分執行任務 scheduler.add_job(tick, 'cron', hour=19,minute=23) # 每天8點整執行 scheduler.add_job(tick, 'cron', day_of_week='0-6', hour=8, minute=00, second=00) # 每天0點,1點,8點執行 scheduler.add_job(tick,'cron',month='*', day='*', hour='0,1,8',minute='00') # 表示2017年3月22日17時19分07秒執行該程序 scheduler.add_job(tick, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7) # 表示任務在6,7,8,11,12月份的第三個星期五的00:00,01:00,02:00,03:00 執行該程序 scheduler.add_job(tick, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00 scheduler.add_job(tick, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30') # 表示每5秒執行該程序一次,相當於interval 間隔調度中seconds = 5 scheduler.add_job(tick, 'cron', second='*/5') print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C ')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass
完整Demo
#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2020/11/17 0017 22:16 # @Author : liudinglong # @File : send_demo.py # @Description: # @Question: ''' from datetime import datetime import json import urllib.request import pymysql as pms from apscheduler.schedulers.background import BackgroundScheduler,BlockingScheduler import os # Mac下關閉ssl驗證用到以下模塊 import ssl ''' ---------------------------------------------- # 需要CMD命令下安裝以下支持庫: # pip install apscheduler # pip install pymysql ---------------------------------------------- ''' # Mac和Linux下關閉ssl驗證,不然會報錯 ssl._create_default_https_context = ssl._create_unverified_context # 你的釘釘機器人url global myurl my_url = "https://oapi.dingtalk.com/robot/send?access_token=XXXXXXXXXXXX" def send_request(url, datas): # 傳入url和內容發送請求 # 構建一下請求頭部 header = { "Content-Type": "application/json", "Charset": "UTF-8" } sendData = json.dumps(datas) # 將字典類型數據轉化為json格式 sendDatas = sendData.encode("utf-8") # python3的Request要求data為byte類型 # 發送請求 request = urllib.request.Request(url=url, data=sendDatas, headers=header) # 將請求發回的數據構建成為文件格式 opener = urllib.request.urlopen(request) # 打印返回的結果 print(opener.read()) def get_mysqldatas(sql): # 一個傳入sql導出數據的函數,實例為MySQL需要先安裝pymysql庫,cmd窗口命令:pip install pymysql # 跟數據庫建立連接 conn = pms.connect(host='服務器地址', user='用戶名', passwd='密碼', database='數據庫', port=3306, charset="utf8") # 使用 cursor() 方法創建一個游標對象 cur = conn.cursor() # 使用 execute() 方法執行 SQL cur.execute(sql) # 獲取所需要的數據 datas = cur.fetchall() # 關閉連接 cur.close() # 返回所需的數據 return datas def get_ddmodel_datas(type): # 返回釘釘模型數據,1:文本;2:markdown所有人;3:markdown帶圖片,@接收人;4:link類型 if type == 1: my_data = { "msgtype": "text", "text": { "content": " " }, "at": { "atMobiles": [ "188XXXXXXX" ], "isAtAll": False } } elif type == 2: my_data = { "msgtype": "markdown", "markdown": {"title": " ", "text": " " }, "at": { "isAtAll": True } } elif type == 3: my_data = { "msgtype": "markdown", "markdown": {"title": " ", "text": " " }, "at": { "atMobiles": [ "188XXXXXXXX" ], "isAtAll": False } } elif type == 4: my_data = { "msgtype": "link", "link": { "text": " ", "title": " ", "picUrl": "", "messageUrl": " " } } return my_data def main(): print('Main! The time is: %s' % datetime.now()) # 按照釘釘給的數據格式設計請求內容 鏈接https://open-doc.dingtalk.com/docs/doc.htm?spm=a219a.7629140.0.0.p7hJKp&treeId=257&articleId=105735&docType=1 # 調用釘釘機器人全局變量myurl global myurl # 1.Text類型群發消息 # 合並標題和數據 My_content = "hello, @188XXXXXXXX 這是一個測試消息" my_data = get_ddmodel_datas(1) # 把文本內容寫入請求格式中 my_data["text"]["content"] = My_content send_request(my_url, my_data) # 2.Markdown類型群發消息(MySQL查詢結果發送) # 獲取sql數據 sql = "SELECT branch_no,count(*) from wzy_customer_user group by branch_no order by branch_no" my_mydata = get_mysqldatas(sql) str1 = '\t\n\r' seq = [] for i in range(len(my_mydata)): seq.append(str(my_mydata[i])) data = str1.join(seq) data = data.replace('\'', '') data = data.replace('(', '') data = data.replace(')', '') data = data.replace(',', '\t') print(data) Mytitle = "#### XXX報表\r\n單位\t數量\t\n\r %s" my_Mytitle = Mytitle.join('\t\n') % data my_data = get_ddmodel_datas(2) my_data["markdown"]["title"] = "XXXX 通報" my_data["markdown"]["text"] = my_Mytitle send_request(my_url, my_data) # 3.Markdown(帶圖片@對象) my_data = get_ddmodel_datas(3) my_data["markdown"]["title"] = "系統預警" my_data["markdown"][ "text"] = "#### 系統預警內容 \n > @188XXXXXXXX \n\n > \n > ###### 20點00分發布 [詳情](http://www.baidu.cn/)" send_request(my_url, my_data) # 字體顏色:<font color='#FF4500' size=2>%s</font> 雙\n\n表示換行 # 4.Link類型群發消息 my_data = get_ddmodel_datas(4) my_data["link"]["text"] = "群機器人是釘釘群的高級擴展功能。群機器人可以將第三方服務的信息聚合到群聊中,實現自動化的信息同步。 " my_data["link"]["title"] = "自定義機器人協議" my_data["link"][ "messageUrl"] = "https://open-doc.dingtalk.com/docs/doc.htm?spm=a219a.7629140.0.0.Rqyvqo&treeId=257&articleId=105735&docType=1" send_request(my_url, my_data) if __name__ == "__main__": # 定時執行任務,需要先安裝apscheduler庫,cmd窗口命令:pip install apscheduler # 隨腳本執行 # 1.創建調度器 # scheduler = BlockingScheduler() --阻塞方式 # 后台執行 此處程序不會發生阻塞 scheduler = BackgroundScheduler() # 2.添加調度任務 # 3.觸發器triggers='interval' # 每隔20秒執行一次 scheduler.add_job(main, 'interval', seconds=20) ''' ***定時執行示例*** #固定時間執行一次 #sched.add_job(main, 'cron', year=2018, month=9, day=28, hour=15, minute=40, second=30) #表示2017年3月22日17時19分07秒執行該程序 scheduler.add_job(my_job, 'cron', year=2017,month = 03,day = 22,hour = 17,minute = 19,second = 07) #表示任務在6,7,8,11,12月份的第三個星期五的00:00,01:00,02:00,03:00 執行該程序 scheduler.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') #表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00 scheduler.add_job(my_job(), 'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2014-05-30') #表示每5秒執行該程序一次,相當於interval 間隔調度中seconds = 5 scheduler.add_job(my_job, 'cron',second = '*/5') ''' # 4.滿足條件執行器 scheduler.start() print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: # 其他任務是獨立的線程執行 while True: pass # time.sleep(60) # print('進程正在執行!') except (KeyboardInterrupt, SystemExit): # 終止任務 scheduler.shutdown() print('Exit The Job!')
使用案例——釘釘群定時播報消息
1、在釘釘群助手中,自定義一個機器人,如圖:
代碼設置10秒發送一次,具體如下:
scheduler.add_job(main,'interval',seconds=10)
運行結果:
截圖如下:
腳本部署
定時任務的腳本在一定時期內是需要持久使用,如果用IDE跑肯定不方面,於是將它弄到服務器上。
先把腳本上傳到服務器上,然后按照相關的庫,最后就是啟動,在Linux啟動方式如下:
linux命令運行py腳本:nohup python -u test.py > out.log 2>&1 &
日志:
這里需要注意的是,參數使用-u的意義:
python的輸出有緩沖,導致out.log並不能夠馬上看到輸出。 -u 參數,使得python不啟用緩沖。
nohup就是不掛起的意思( no hang up)。該命令的一般形式為:nohup ./test &
末尾加個&是指在后台運行,不會因為終端關閉或斷開連接而終止程序。
具體可以參考:https://www.runoob.com/linux/linux-comm-nohup.html
這樣就啟動了一個py服務。
總結
對定時任務框架Apscheduler的簡單使用到此。在工作中遇到其他需要,可以進一步了解,學習是為了解決問題,為了更好的工作。同時,歡迎小伙伴進去溝通交流測試心得與工作方法。