【FastAPI 學習十二】定時任務篇


定時任務是一個通用場景常見的功能,之前我使用django的時候,更習慣使用celery中的定時任務,現在花時間看了看apscheduler感覺不錯,就寫了demo,並集成到項目代碼中了

任務調度主要就是以下幾個功能

  • 添加/刪除 任務調度
  • 暫停/恢復 任務調度(這條我未實現)
  • 查看定時任務狀態

定時任務

添加定時任務

其中添加定時任務方式,有以下三種方式

  • date: 固定的時間執行一次時 用這種
  • interval: 想要在固定的間隔時間循環執行時用這種
  • cron: 這種就是最為靈活的 crontab 表達式定時任務了

原文 https://apscheduler.readthedocs.io/en/stable/userguide.html#starting-the-scheduler
date: use when you want to run the job just once at a certain point of time
interval: use when you want to run the job at fixed intervals of time
cron: use when you want to run the job periodically at certain time(s) of day

Tip: crontab寫法可以參考這個網站 https://crontab.guru/

在FastAPI異步框架中,選擇 AsyncIOScheduler調度程序

默認使用sqlite持久化定時任務,不至於重啟就失效

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.cron import CronTrigger

Schedule = AsyncIOScheduler(
    jobstores={
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    }
)
Schedule.start()

不完善的地方

  • 1 由於時間倉促,比如參數驗證的部分,參數數據校驗,還可以繼續完善。
  • 2 添加調度任務參數封裝,我把三種添加任務的三種方式,拆成了三個函數,其中很多參數沒有用到。
  • 3 其中文檔中還有說明最大woker數量限制之類的,然后按照我使用其他定時任務的常識,應該還會函數最長執行時間限制(比如執行的功能函數特別耗時)
  • 4 ...等等

這些就需要自己查看文檔和issues搜索了

其他方案

定時任務有很多種方案,比如可以使用

說到arq, 想起我之前,使用過rq 並且學習的時候稍微翻譯了一下文檔 rq v1.0 https://codercharm.github.io/Python-rq-doc-cn/#/ 一晃過去一年了。

代碼地址

參考地址


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM