1、BlockingScheduler:當調度程序是您的流程中唯一運行的東西時使用 2、只需調用start()調度程序即可啟動調度 程序。對於除以外的調度程序BlockingScheduler,此調用將立即返回,您可以繼續應用程序的初始化過程,可能會向調度程序添加作業。 3、添加作業(add_job())
1、Blockingscheduler 這個是一個阻塞等待程序 2、當調度程序是您的流程中唯一運行的東西時使用
# 代碼! from datetime import datetime import os import logging from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR basicConfig = {"level": logging.INFO, "format": "%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s", "datefmt": '%Y-%m-%d %H:%M:%S', "filename": 'log1.BlockingScheduler', "filemode": "a"} kwargs={ 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'postgres://postgres:123456@127.0.0.1:5433/postgres' }, 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '20' }, 'apscheduler.executors.processpool': { 'type': 'processpool', 'max_workers': '5' }, 'apscheduler.job_defaults.coalesce': 'false', 'apscheduler.job_defaults.max_instances': '3', 'apscheduler.timezone': 'Asia/Shanghai', # 設置時區 ,存儲桶時區 } def my_listener(event): if event.exception: print('任務出錯了!!!!!!') else: print('任務照常運行...') def tick(): print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': logging.basicConfig(**basicConfig) scheduler = BlockingScheduler(kwargs) scheduler.add_job(tick, 'interval', seconds=3) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger = logging scheduler.start() except (KeyboardInterrupt, SystemExit): pass
日志格式 紅色標記 basicConfig 日志配置 Logging.basicConfig()初始化日志配置 scheduler._logger = logging 加載日志配置
偵聽器 黃色標記
任何代碼都可能發生意外,關鍵是,發生意外了,如何第一時間知道,這才是公司最關心的,apscheduler已經為我們想到了這些。
存儲桶配置 綠色
幾年前學習得筆記搬過來 谷歌onenote 要vpn 好難受呀