django apscheduler在特定時間執行一次任務(run at a specify time only once)


 

  如何使程序在特定時間只執行一次,我查了一下。

       celery可以,時間以秒計。

task = mytask.apply_async(args=[10, 20], countdown=60)

  不過,我們存在以天、星期、甚至月的倒計時,感覺celery還是不滿足需求。

  最后找到了apscheduler。

from django.utils import timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
#from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

#異步式的 scheduler = BackgroundScheduler( jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone())

#阻塞的,適用於scheduler是獨立服務的場景。 #scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone()) #scheduler = BlockingScheduler(executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone()) def myjob(): pass try: scheduler.start() # 5s后執行myjob # 傳入時間去除毫秒 deadline = datetime.datetime.now().replace(microsecond=0) + datetime.timedelta(seconds=5) scheduler.add_job(myjob, 'date', run_date=deadline) except (KeyboardInterrupt, SystemExit): scheduler.shutdown()

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM