Python 定時任務框架 APScheduler 詳解


APScheduler

最近想寫個任務調度程序,於是研究了下 Python 中的任務調度工具,比較有名的是:Celery,RQ,APScheduler。

Celery:非常強大的分布式任務調度框架

RQ:基於Redis的作業隊列工具

APScheduler:一款強大的任務調度工具

RQ 參考 Celery,據說要比 Celery 輕量級。在我看來 Celery 和 RQ 太重量級了,需要單獨啟動進程,並且依賴第三方數據庫或者緩存,適合嵌入到較大型的 python 項目中。其次是 Celery 和 RQ 目前的最新版本都不支持動態的添加定時任務(celery 官方不支持,可以使用第三方的 redisbeat 或者 redbeat 實現),所以對於一般的項目推薦用 APScheduler,簡單高效。

Apscheduler是一個基於Quartz的python定時任務框架,相關的 api 接口調用起來比較方便,目前其提供了基於日期、固定時間間隔以及corntab類型的任務,並且可持久化任務;同時它提供了多種不同的調用器,方便開發者根據自己的需求進行使用,也方便與數據庫等第三方的外部持久化儲存機制進行協同工作,非常強大。

安裝

最簡單的方法是使用 pip 安裝:

$ pip install apscheduler

或者下載源碼安裝 APScheduler

$ python setup.py install

目前版本:3.6.3

基本概念

APScheduler 具有四種組件:

  • triggers(觸發器)
  • jobstores (job 存儲)
  • executors (執行器)
  • schedulers (調度器)

triggers:觸發器管理着 job 的調度方式。

jobstores: 用於 job 數據的持久化。默認 job 存儲在內存中,還可以存儲在各種數據庫中。除了內存方式不需要序列化之外(一個例外是使用 ProcessPoolExecutor),其余都需要 job 函數參數可序列化。另外多個調度器之間絕對不能共享 job 存儲(APScheduler 原作者的意思是不支持分布式,但是我們可以通過重寫部分函數實現,具體方法后面再介紹)。

executors :負責處理 job。通常使用線程池(默認)或者進程池來運行 job。當 job 完成時,會通知調度器並發出合適的事件。

schedulers : 將 job 與以上組件綁定在一起。通常在程序中僅運行一個調度器,並且不直接處理 jobstores ,executors 或 triggers,而是通過調度器提供的接口,比如添加,修改和刪除 job。

選擇正確的調度器,job 存儲,執行器和觸發器

調度器的選擇主要取決於編程環境以及 APScheduler 的用途。主要有以下幾種跳度器:

  • apscheduler.schedulers.blocking.BlockingScheduler:當調度器是程序中唯一運行的東西時使用,阻塞式。
  • apscheduler.schedulers.background.BackgroundScheduler:當調度器需要后台運行時使用。
  • apscheduler.schedulers.asyncio.AsyncIOScheduler:當程序使用 asyncio 框架時使用。
  • apscheduler.schedulers.gevent.GeventScheduler:當程序使用 gevent 框架時使用。
  • apscheduler.schedulers.tornado.TornadoScheduler:當構建 Tornado 程序時使用
  • apscheduler.schedulers.twisted.TwistedScheduler:當構建 Twisted 程序時使用
  • apscheduler.schedulers.qt.QtScheduler:當構建 Qt 程序時使用

要選擇適當的 job 存儲,需要看 job 是否需要持久化。如果程序啟動會重新創建作業,則可以使用默認的內存方式(MemoryJobStore)。如果需要 job 在程序重新啟動或崩潰后繼續存在,那么建議使用其他 job 存儲方式。系統內置主要有以下幾種 job 存儲:

  • apscheduler.jobstores.memory.MemoryJobStore:使用內存存儲
  • apscheduler.jobstores.mongodb.MongoDBJobStore:使用 MongoDB 存儲
  • apscheduler.jobstores.redis.RedisJobStore:使用 redis 存儲
  • apscheduler.jobstores.rethinkdb.RethinkDBJobStore:使用 rethinkdb 存儲
  • apscheduler.jobstores.sqlalchemy.SQLAlchemyJobStore:使用 ORM 框架 SQLAlchemy,后端可以是 sqlite、mysql、PoatgreSQL 等數據庫
  • apscheduler.jobstores.zookeeper.ZooKeeperJobStore:使用 zookeeper 存儲

執行器的選擇要根據 job 的類型。默認的線程池執行器 apscheduler.executors.pool.ThreadPoolExecutor 可以滿足大多數情況。如果 job 屬於 CPU 密集型操作則建議使用進程池執行器 apscheduler.executors.pool.ProcessPoolExecutor。當然也可以同時使用兩者,將進程池執行器添加為輔助執行器。

當添加 job 時,可以選擇一個觸發器,它管理着 job 的調度方式。APScheduler 內置三種觸發器:

  • apscheduler.triggers.date:在某個特定時間僅運行一次 job 時使用
  • apscheduler.triggers.interval:當以固定的時間間隔運行 job 時使用
  • apscheduler.triggers.cron:當在特定時間定期運行 job 時使用

配置調度器

APScheduler 提供了多種不同的方式來配置調度器。

假設使用默認 job 存儲和默認執行器運行 BackgroundScheduler:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

以上創建了一個 BackgroundScheduler 調度器,job 存儲使用默認的 MemoryJobStore,執行器使用默認的 ThreadPoolExecutor,最大線程數 10 個。

假如想做以下設置:

  • 一個名為 mongo 的 job 存儲,后端使用 MongoDB
  • 一個名為 default 的 job 存儲,后端使用數據庫(使用 Sqlite)
  • 一個名為 default 的線程池執行器,最大線程數 20 個
  • 一個名為 processpool 的進程池執行器,最大進程數 5 個
  • 調度器使用 UTC 時區
  • 開啟 job 合並
  • job 最大實例限制為 3 個

方法一:

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}

job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方法二:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

方法三:

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor

jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}

job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

scheduler = BackgroundScheduler()
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

啟動調度器

只需調用 start() 即可啟動調度器。對於 BlockingScheduler 以外的調度器,都會直接返回,返回后可以繼續其他工作,比如添加 job;對於 BlockingScheduler ,必須在完成所有初始化已經添加好 job 后才能調用 start()。

注意:調度器啟動后就無法更改配置了。

添加 job

兩種方式:

  1. 使用方法 add_job()
  2. 使用裝飾器 scheduled_job()

第一種是最常用方法,第二種方法適合程序運行后不需要更改的作業。 add_job() 會返回一個 apscheduler.job.Job 實例,可以用於修改或者刪除 job 等。如果添加 job 時,調度器尚未啟動,則會暫停調度 job,並且僅在調度器啟動時才計算其首次運行時間

添加 job 時第二個參數是 trigger,正如前面所說,可以指定三種類型的觸發器:cron、interval 和 date。

cron:在特定時間定期運行 job

兼容 unix/linux 系統 crontab 格式,但是比其多了秒(second)、年(year)、第多少周(week)以及限定開始時間(start_date)和結束時間(end_date)的功能,並且天(day)的設置更加靈活,支持類似 last fri 的格式,具體見以下的詳解。

主要參數:

year(int|str) - 年,4位數

month(int|str) - 月,1-12

day(int|str) - 日,1-31

week(int|str) - 一年中的第多少周,1-53

day_of_week(int|str) - 星期,0-6 或者 mon,tue,wed,thu,fri,sat,sun

hour(int|str) - 小時,0-23

minute(int|str) - 分,0-59

second(int|str) - 秒,0-59

start_date(date|datetime|str) - 開始時間

end_date(date|datetime|str) - 結束時間

不同於 unix/linux 系統 crond 格式,添加 job 時可以忽略不必要的字段。
大於最小有效值的字段默認為*,而較小的字段默認為其最小值,除了 weekday_of_week 默認為 *

可能這種表述不是太理解,舉幾個例子:

day=1, minute=20 最小有效值字段為 minute 故等價於 year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0,意思是在每年每月 1 號每小時的 20 分 0 秒運行;

hour=1 最小有效值字段為 hour 故等價於 year='*', month='*', day=*, week='*', day_of_week='*', hour=1, minute=0, second=0,意思是在每年每月每天 1 點的 0 分 0 秒運行;

month=6, hour=1 最小有效值字段也為 hour 故等價於 year='*', month=6, day=*, week='*', day_of_week='*', hour=1, minute=0, second=0,意思是在每年 6 月每天 0 點 0 分 0 秒運行;

month=6 最小有效值字段也為 month 故等價於 year='*', month=6, day=1, week='*', day_of_week='*', hour=0, minute=0, second=0,意思是在每年 6 月 1號 0 點 0 分 0 秒運行;

year=2020 最小有效值字段也為 year 故等價於 year=2020, month=1, day=1, week='*', day_of_week='*', hour=0, minute=0, second=0,意思是在 2020 年 1 月 1 號 0 點 0 分 0 秒運行;

參數還支持表達式,下表列出了從 yearsecond 字段可用的表達式。一個字段中可以給出多個表達式,用 , 分隔。

序號 表達式 可用字段 描述
1 * 所有 匹配字段所有取值
2 */a 所有 匹配字段每遞增 a 后的值, 從字段最小值開始,包括最小值,比如小時(hour)的 */5,則匹配0,5,10,15,20
3 a/b 所有 匹配字段每遞增 b 后的值, 從字段值 a 開始,包括 a,比如小時(hour)的 2/9,則匹配2,11,20
4 a-b 所有 匹配字段 ab 之間的取值,a 必須小於 b,包括 ab,比如2-5,則匹配2,3,4,5
5 a-b/c 所有 匹配 ab 之間每遞增 c 后的值,包括 a,不一定包括 b,比如1-20/5,則匹配1,6,11,16
6 xth y day 匹配 y 在當月的第 x 次,比如 3rd fri 指當月的第三個周五
7 last x day 匹配 x 在當月的最后一次,比如 last fri 指當月的最后一個周五
8 last day 匹配當月的最后一天
9 x,y,z 所有 匹配以 , 分割的多個表達式的組合

例:

import datetime
from apscheduler.schedulers.background import BackgroundScheduler


def job1():
	print('job1')


def job2(x, y):
	print('job2', x, y)


scheduler = BackgroundScheduler()
scheduler.start()

# 每天 2 點運行
scheduler.add_job(
	job1,
	trigger='cron',
	hour=2
)

# 每天 2 點 30 分 5 秒運行
scheduler.add_job(
	job2,
	trigger='cron',
	second=5,
	minute=30,
	hour=2,
	args=['hello', 'world']
)

# 每 10 秒運行一次
scheduler.add_job(
	job1,
	trigger='cron',
	second='*/10'
)

# 每天 1:00,2:00,3:00 運行
scheduler.add_job(
	job1,
	trigger='cron',
	hour='1-3'
)

# 在 6,7,8,11,12 月的第三個周五 的 1:00,2:00,3:00 運行
scheduler.add_job(
	job1,
	trigger='cron',
	month='6-8,11-12',
	day='3rd fri',
	hour='1-3'
)

# 在 2019-12-31 號之前的周一到周五 5 點 30 分運行
scheduler.add_job(
	job1,
	trigger='cron',
	day_of_week='mon-fri',
	hour=5,
	minute=30,
	end_date='2019-12-31'
)

interval:以固定的時間間隔運行 job

主要參數:

weeks(int) - 表示等待時間的周數

days(int) - 表示等待時間天數

hours(int) - 表示等待時間小時數

minutes(int) - 表示等待時間分鍾數

seconds(int) - 表示等待時間秒數

start_date(date|datetime|str) - 開始時間

end_date(date|datetime|str) - 結束時間

例:

from apscheduler.schedulers.background import BackgroundScheduler


def job():
	print('job')


scheduler = BackgroundScheduler()
scheduler.start()

# 每 2 小時運行一次
scheduler.add_job(
	job,
	trigger='interval',
	hours=2
)

# 2019-10-01 00:00:00 到 2019-10-31 23:59:59 之間每 2 小時運行一次
scheduler.add_job(
	job,
	trigger='interval',
	hours=2,
	start_date='2019-10-01 00:00:00',
	end_date='2019-10-31 23:59:59',
)

# 每 2 天 3 小時 4 分鍾 5 秒 運行一次
scheduler.add_job(
	job,
	trigger='interval',
	days=2,
	hours=3,
	minutes=4,
	seconds=5
)

date:某個特定時間僅運行一次 job

例:

import datetime
from apscheduler.schedulers.background import BackgroundScheduler


def job():
	print('job')

scheduler = BackgroundScheduler()
scheduler.start()

# 3 秒后運行
scheduler.add_job(
	job,
	trigger='date',
	run_date=datetime.datetime.now() + datetime.timedelta(seconds=3)
)

# 2019.11.22 00:00:00 運行
scheduler.add_job(
	job,
	trigger='date',
	run_date=datetime.date(2019, 11, 22),
)

# 2019.11.22 16:30:01 運行
scheduler.add_job(
	job,
	trigger='date',
	run_date=datetime.datetime(2019, 11, 22, 16, 30, 1),
)

# 2019.11.31 16:30:01 運行
scheduler.add_job(
	job,
	trigger='date',
	run_date='2019-11-31 16:30:01',
)

# 立即運行
scheduler.add_job(
	job,
	trigger='date'
)

小提示:

如果想立即運行 job ,則可以在添加 job 時省略 trigger 參數;

添加 job 時的日期設置參數 start_date、end_date 以及 run_date 都支持字符串格式('2019-12-31' 或者 '2019-12-31 12:01:30')、datetime.date(datetime.date(2019, 12, 31)) 或者 datetime.datetime(datetime.datetime(2019, 12, 31, 16, 30, 1));

刪除 job

當調度器中刪除 job 時,該 job 也將從其關聯的 job 存儲中刪除,並且將不再執行。有兩種方法可以實現此目的:

  1. 通過調用方法 remove_job() ,指定 job ID 和 job 存儲別名
  2. 通過調用 add_job() 時 返回的 apscheduler.job.Job 實例的 remove() 方法

例:

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

或者:

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

注意: 如果任務已經調度完畢,並且之后也不會再被執行的情況下,會被自動刪除。

暫停和恢復 job

暫停和恢復 job 與 刪除 job 方法類似:

暫停:

job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
job.pause()		# or
scheduler.pause_job('my_job_id')

恢復:

job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
job.resume()	# or
scheduler.resume_job('my_job_id')

獲取 job 列表

使用 get_jobs() 方法獲取一個列表,或者使用 print_jobs() 方法打印一個格式化的列表。

jobs = scheduler.get_jobs()	# or
scheduler.print_jobs()

提示:可以使用 get_job(id) 獲取單個 job 信息

修改 job

修改 job 依然與 刪除 job 方法類似,可以修改除 job id 以外的其他屬性。

例:

job.modify(max_instances=6, name='Alternate name')

如果想修改觸發器,可以使用 apscheduler.job.Job.reschedule 或者 apscheduler.schedulers.base.BaseScheduler.reschedule_job

例:

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

其實修改 job 也可以使用 add_job() 方法,只需要指定參數 replace_existing=True 以及相同的 job_id 即可。

關閉調度器

關閉調度器方法:

scheduler.shutdown()

默認情況下,會關閉 job 存儲和執行器,並等待所有正在執行的 job 完成。如果不想等待則可以使用以下方法關閉:

scheduler.shutdown(wait=False)

暫停/恢復調度器

暫停調度器:

scheduler.pause()

恢復調度器:

scheduler.resume()

啟動調度器的時候可以指定 paused=True,以這種方式啟動的調度器直接就是暫停狀態。

scheduler.start(paused=True)

限制 job 並發執行實例數量

默認情況下,每個 job 僅允許 1 個實例同時運行。這意味着,如果該 job 將要運行,但是前一個實例尚未完成,則最新的 job 不會調度。可以在添加 job 時指定 max_instances 參數解除限制。

  • max_instances 可以在初始化調度器的時候設置一個全局默認值,添加任務時可以再單獨指定

job 合並

當由於某種原因導致某個 job 積攢了好幾次沒有實際運行(比如說系統掛了 5 分鍾后恢復,有一個任務是每分鍾跑一次的,按道理說這 5 分鍾內本來是“計划”運行 5 次的,但實際沒有執行),如果 coalesce 為 True,下次這個 job 被 submit 給 executor 時,只會執行 1 次,也就是最后這次,如果為 False,那么會執行 5 次(不一定,因為還有其他條件,看后面misfire_grace_time)。misfire_grace_time:單位為秒,假設有這么一種情況,當某一 job 被調度時剛好線程池都被占滿,調度器會選擇將該 job 排隊不運行,misfire_grace_time 參數則是在線程池有可用線程時會比對該 job 的應調度時間跟當前時間的差值,如果差值小於 misfire_grace_time 時,調度器會再次調度該 job;反之該 job 的執行狀態為 EVENTJOBMISSED 了,即錯過運行。

  • coalesce 與 misfire_grace_time 可以在初始化調度器的時候設置一個全局默認值,添加任務時可以再單獨指定

調度器事件

調度器事件只有在某些情況下才會被觸發,並且可以攜帶某些有用的信息。通過 add_listener() 傳遞適當參數,可以實現監聽不同是事件,比如 job 運行成功、運行失敗等。具體支持的事件類型見官方文檔

例:

from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR


def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')


scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

擴展 APScheduler

APScheduler 的四種組件都可以自定義擴展:

  • triggers(觸發器)
  • jobstores (job 存儲)
  • executors (執行器)
  • schedulers (調度器)

具體方法參考官方文檔

分布式 APScheduler

APScheduler 默認是不支持分布式運行的,詳見官方 FAQ。當將其集成到 flask 或者 django 項目后,如果用 gunicorn 部署,gunicorn 可能會啟動多個 worker 從而導致 job 重復執行。gunicorn 配置參數 --preload 和 worker=1 后,只啟動一個 worker,可以適當緩解這個問題(這個方法有個問題:當自動重啟 worker 的時候,如果這時后台剛好有一個耗時任務正常執行,比如需要執行 30s,而系統中還有一個每秒執行的任務,這時就會丟失部分每秒執行的任務)。

那有沒有好的方法解決呢?肯定是有的,首先我們看看其基本原理:總的來說,其主要是利用 python threading Event 和 Lock 鎖來寫的。scheduler 在主循環 (_main_loop)中,反復檢查是否有需要執行的任務,完成任務的檢查函數為 _process_jobs,這個函數主要有以下幾個步驟:

1、 詢問儲存的每個 jobStore,是否有到期要執行的任務。

...
due_jobs = jobstore.get_due_jobs(now)
...

2、due_jobs 不為空,則計算這些 jobs 中每個 job 需要運行的時間點,時間一到就 submit 給任務調度。

...
run_times = job._get_run_times(now)
run_times = run_times[-1:] if run_times and job.coalesce else run_times
if run_times:
    try:
        executor.submit_job(job, run_times)
    except MaxInstancesReachedError:
...

3、在主循環中,如果不間斷地調用,而實際上沒有要執行的 job,這會造成資源浪費。因此在程序中,如果每次掉用 _process_jobs 后,進行了預先判斷,判斷下一次要執行的 job(離現在最近的)還要多長時間,作為返回值告訴 main_loop, 這時主循環就可以去睡一覺,等大約這么長時間后再喚醒,執行下一次 _process_jobs。

...
# Determine the delay until this method should be called again
if self.state == STATE_PAUSED:
    wait_seconds = None
    self._logger.debug('Scheduler is paused; waiting until resume() is called')
elif next_wakeup_time is None:
    wait_seconds = None
    self._logger.debug('No jobs; waiting until a job is added')
else:
    wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
    self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
                       wait_seconds)

return wait_seconds

根據以上基本原理,其實可以發現重寫 _process_jobs 函數就能解決。主要思路是文件鎖,當 worker 准備獲取要執行的 job 時必須先獲取到文件鎖,獲取文件鎖后分配 job 到執行器后,再釋放文件鎖。具體代碼如下:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.base import MaxInstancesReachedError
from apscheduler.events import (
    JobSubmissionEvent, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES,
)
from apscheduler.util import (
    timedelta_seconds, TIMEOUT_MAX
)
from datetime import datetime, timedelta
import six
import fcntl
import os

#: constant indicating a scheduler's stopped state
STATE_STOPPED = 0
#: constant indicating a scheduler's running state (started and processing jobs)
STATE_RUNNING = 1
#: constant indicating a scheduler's paused state (started but not processing jobs)
STATE_PAUSED = 2


class DistributedBackgroundScheduler(BackgroundScheduler):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def _process_jobs(self):
        """
        Iterates through jobs in every jobstore, starts jobs that are due and figures out how long
        to wait for the next round.

        If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least
        ``jobstore_retry_interval`` seconds.
        """
        if self.state == STATE_PAUSED:
            self._logger.debug('pid: %s Scheduler is paused -- not processing jobs' % os.getpid())
            return None
        f = None
        try:
            f = open("scheduler.lock", "wb")
            # 這里必須使用 lockf, 因為 gunicorn 的 worker 進程都是 master 進程 fork 出來的
            # flock 會使子進程擁有父進程的鎖
            # fcntl.flock(flock, fcntl.LOCK_EX | fcntl.LOCK_NB)
            fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
            self._logger.info("pid: %s get Scheduler file lock success" % os.getpid())
        except BaseException as exc:
            self._logger.warning("pid: %s get Scheduler file lock error: %s" % (os.getpid(), str(exc)))
            try:
                if f:
                    f.close()
            except BaseException:
                pass
            return None
        else:
            self._logger.debug('pid: %s Looking for jobs to run' % os.getpid())
            now = datetime.now(self.timezone)
            next_wakeup_time = None
            events = []

            with self._jobstores_lock:
                for jobstore_alias, jobstore in six.iteritems(self._jobstores):
                    try:
                        due_jobs = jobstore.get_due_jobs(now)
                    except Exception as e:
                        # Schedule a wakeup at least in jobstore_retry_interval seconds
                        self._logger.warning('pid: %s Error getting due jobs from job store %r: %s',
                                             os.getpid(), jobstore_alias, e)
                        retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
                        if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
                            next_wakeup_time = retry_wakeup_time

                        continue

                    for job in due_jobs:
                        # Look up the job's executor
                        try:
                            executor = self._lookup_executor(job.executor)
                        except BaseException:
                            self._logger.error(
                                'pid: %s Executor lookup ("%s") failed for job "%s" -- removing it from the '
                                'job store', os.getpid(), job.executor, job)
                            self.remove_job(job.id, jobstore_alias)
                            continue

                        run_times = job._get_run_times(now)
                        run_times = run_times[-1:] if run_times and job.coalesce else run_times
                        if run_times:
                            try:
                                executor.submit_job(job, run_times)
                            except MaxInstancesReachedError:
                                self._logger.warning(
                                    'pid: %s Execution of job "%s" skipped: maximum number of running '
                                    'instances reached (%d)', os.getpid(), job, job.max_instances)
                                event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
                                                           jobstore_alias, run_times)
                                events.append(event)
                            except BaseException:
                                # 分配任務錯誤后馬上釋放文件鎖,讓其他 worker 搶占
                                try:
                                    fcntl.flock(f, fcntl.LOCK_UN)
                                    f.close()
                                    self._logger.info("pid: %s unlocked Scheduler file success" % os.getpid())
                                except:
                                    pass
                                self._logger.exception('pid: %s Error submitting job "%s" to executor "%s"',
                                                       os.getpid(), job, job.executor)
                                break
                            else:
                                event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
                                                           run_times)
                                events.append(event)

                            # Update the job if it has a next execution time.
                            # Otherwise remove it from the job store.
                            job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)
                            if job_next_run:
                                job._modify(next_run_time=job_next_run)
                                jobstore.update_job(job)
                            else:
                                self.remove_job(job.id, jobstore_alias)

                    # Set a new next wakeup time if there isn't one yet or
                    # the jobstore has an even earlier one
                    jobstore_next_run_time = jobstore.get_next_run_time()
                    if jobstore_next_run_time and (next_wakeup_time is None or
                                                   jobstore_next_run_time < next_wakeup_time):
                        next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)

            # Dispatch collected events
            for event in events:
                self._dispatch_event(event)

            # Determine the delay until this method should be called again
            if next_wakeup_time is None:
                wait_seconds = None
                self._logger.debug('pid: %s No jobs; waiting until a job is added', os.getpid())
            else:
                wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
                self._logger.debug('pid: %s Next wakeup is due at %s (in %f seconds)', os.getpid(), next_wakeup_time,
                                   wait_seconds)
            try:
                fcntl.flock(f, fcntl.LOCK_UN)
                f.close()
                self._logger.info("pid: %s unlocked Scheduler file success" % os.getpid())
            except:
                pass

        return wait_seconds

文件鎖只支持 unix/linux 系統,並且只能實現本機的分布式。如果想實現多台主機的的分布式,需要借助 redis 或者 zookeeper 實現分布鎖,原理和文件鎖一樣的,都是重寫 _process_jobs 函數實現,代碼就不再贅述,有興趣的朋友可以自己研究一下。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM