APScheduler 定時任務系統


了解定時任務添加方式! 

及定時任務運行方式! 

及其運行原理! 

 

1、安裝 

pip install apschenduler 

 

 

2、核心組件 

觸發器(trigger)包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行。除了他們自己初始配置意外,觸發器完全是無狀態的。 

  

作業存儲(job store)存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據講在保存在持久化作業存儲時被序列化,並在加載時被反序列化。調度器不能分享同一個作業存儲。 

  

執行器(executor)處理作業的運行,他們通常通過在作業中提交制定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。 

 

調度器(scheduler)是其他的組成部分。你通常在應用只有一個調度器,應用的開發者通常不會直接處理作業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業。 

3.選擇正確的調度程序,作業存儲,執行程序和觸發器 

要選擇適當的作業存儲,您需要確定是否需要作業持久性。如果您總是在應用程序開始時重新創建作業,則可以使用默認值(MemoryJobStore)。但是,如果您需要作業以在調度程序重新啟動或應用程序崩潰后繼續存在,那么您的選擇通常可以歸結為編程環境中使用的工具。但是,如果您可以自由選擇,則由於其強大的數據完整性保護,因此建議SQLAlchemyJobStore在 PostgreSQL后端上進行選擇。 

同樣,如果您使用上述框架之一,通常會為您選擇執行程序。否則,默認值ThreadPoolExecutor應足以滿足大多數用途。如果您的工作量涉及CPU密集型操作,則應考慮ProcessPoolExecutor改為使用多個CPU內核。您甚至可以同時使用兩者,將進程池執行程序添加為輔助執行程序。 

安排工作時,需要為其選擇一個觸發器。觸發器確定將在運行作業時計算日期/時間的邏輯。APScheduler帶有三種內置的觸發器類型: 

  • date:在您希望在特定時間點僅運行一次作業時使用 

  • interval:當您要以固定的時間間隔運行作業時使用 

  • cron:當您要在一天的特定時間定期運行作業時使用 

也可以將多個觸發器組合成一個觸發器,該觸發器可以在所有參與觸發器約定的時間觸發,也可以在任何觸發器觸發時觸發。有關更多信息,請參見的文檔。combining triggers 

您可以在各自的API文檔頁面上找到每個作業存儲,執行程序和觸發器類型的插件名稱。 

 

  4、調度程序 

  • BlockingScheduler:當調度程序是您的流程中唯一運行的東西時使用 

  • BackgroundScheduler:在不使用以下任何框架且希望調度程序在應用程序內部的后台運行時使用 

  • AsyncIOScheduler:如果您的應用程序使用asyncio模塊,則使用 

  • GeventScheduler:如果您的應用程序使用gevent,則使用 

  • TornadoScheduler:在構建Tornado應用程序時使用 

  • TwistedScheduler:在構建Twisted應用程序時使用 

  • QtScheduler在構建Qt應用程序時使用 

  • UTC作為調度程序的時區 

  • 默認情況下,對新作業關閉合並 

  • 新作業的默認最大實例限制為3 

 

  

 5、配置調度 

  from apscheduler.schedulers.background import BackgroundScheduler 
  scheduler = BackgroundScheduler() 

  # Initialize the rest of the application here, or before the scheduler initialization 

 

配置調度器1(一個名為“ mongo”的MongoDBJobStore 

from pytz import utc 
 
 

from apscheduler.schedulers.background import BackgroundScheduler 

from apscheduler.jobstores.mongodb import MongoDBJobStore 

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore 

from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor  

 

jobstores = { 
 'mongo': MongoDBJobStore(), 
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') 
} 

 
executors = { 
    'default': ThreadPoolExecutor(20), 
    'processpool': ProcessPoolExecutor(5) 
} 
 

job_defaults = { 
    'coalesce'False, 
    'max_instances'3 
} 
 

scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

 
 

配置調度器12(一個名為“ default”的SQLAlchemyJobStore(使用SQLite)) 

from apscheduler.schedulers.background import BackgroundScheduler 
 

 
 
# The "apscheduler." prefix is hard coded 
 

scheduler = BackgroundScheduler({ 
    'apscheduler.jobstores.mongo': { 
         'type''mongodb' 
    }, 
    'apscheduler.jobstores.default': { 
        'type''sqlalchemy', 
        'url''sqlite:///jobs.sqlite' 
    }, 
    'apscheduler.executors.default': { 
        'class''apscheduler.executors.pool:ThreadPoolExecutor', 
        'max_workers''20' 
    }, 
     

'apscheduler.executors.processpool': { 
        'type''processpool', 
        'max_workers''5' 
    }, 
    'apscheduler.job_defaults.coalesce''false', 
    'apscheduler.job_defaults.max_instances''3', 
    'apscheduler.timezone''UTC', 
})  #apscheduler.timezone 是設置存儲桶時區也是作業的時間 國內用這個Asia/Shanghai 

 

一個名為“ default”的ThreadPoolExecutor,其工作人員數為20 

一個名為“ processpool”的ProcessPoolExecutor,其工作人員數為 

from pytz import utc 
 
 

from apscheduler.schedulers.background importBackgroundScheduler 
 

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore 
 

from apscheduler.executors.pool import  

ProcessPoolExecutor 
 
 
jobstores = { 
    'mongo': {'type''mongodb'}, 
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') 
} 
 

executors = { 
    'default': {'type''threadpool''max_workers'20}, 
    'processpool': ProcessPoolExecutor(max_workers=5) 
} 
 

job_defaults = { 
    'coalesce'False, 
    'max_instances'3 
} 
 

scheduler = BackgroundScheduler() 
 

 
# .. do something else here, maybe add jobs etc. 
 
 

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) 

 

replace_existing=True 

程序中斷后重新運行時會自動從數據庫讀取作業信息,而不需要重新再添加到調度器中,如果不注釋 21-25 行添加作業的代碼,則作業會重新添加到數據庫中,這樣就有了兩個同樣的作業,避免出現這種情況可以在 add_job 的參數中增加 replace_existing=True,如 

 

在程序中添加add_job 設置id 那么 沒有replace_existing=True 

1、會報錯,id重復,如果加了重復id 會被過濾不會報錯。 

apscheduler.jobstores.base.ConflictingIdError: 'Job identifier (1234567) conflicts with an existing job' 
 

socketio_server.py 
uwsgl.lm 
uwsgi.log 
uwsgi.pid 
% Scratches and Consoles 
main (1) 
Run: 
•t Traceback (most recent call last): 
defaults.rnax instances' 
tirnezone 
. do something else here, maybe add jobs etc. 
job 
— sched 
sched.start() 
run 3, 24, 10, 
'date', 
run 3, 
13, 
24, 
11, 22, 
5), 
1234567' 1234567") 
Termi I 
File "/home/tang/Desktop/qxym/qxym/apps/apsChenduler/main . py", 
line 66, in <module> 
sched.start() 
File "/h0T,e/tang/ virtualenvs/qxym/lib/python3 6/site-packages/apscheduler/schedulers/blocking py" , 
line 18, in start 
super(810ckingScheduler, self) .start(•args, ••kwargs) 
File "/home/tang/ virtualenvs/qxyT,/lib/python3 6/site-packages/apscheduler/schedulers/base. py" , 
line 162, 
self ._real add job(job, jobstore_alias, replace_existing) 
File "/h0T,e/tang/ virtualenvs/qxym/lib/python3 6/site-packages/apscheduler/schedulers/base. py" , 
line 867, 
store.add '0b) 
Fl e I'/ ore/tang/ . virtua envs/qxym/ 1 /pyt on3.6/s1te-pac ages/apsc e u er/J0 stores/sq a c emy.py", 
Ine 
raise ConflictingldError(job.id) 
apscheduler.jobstores .base.ConflictingIdError: 'Job identifier (1234567) conflicts with an existing job' 
Process finished with exit code 1 
in start 
real add job 
100, 
O IDE and plugin Updates 
pycharm is ready to update. 
Window 
4: Run i 5: Debug 
5: TODO p 9: version control 
Database Changes 
Terminal 
python Console
 
2、會導致原來的存儲作業被刷新,重新執行。有可能會導致多次執行。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM