python apschedule安裝使用與源碼分析


我們的項目中用apschedule作為核心定時調度模塊。所以對apschedule進行了一些調查和源碼級的分析。

 

1、為什么選擇apschedule?

聽信了一句話,apschedule之於python就像是quartz之於java。實際用起來還是不錯的。

 

2、安裝

# pip安裝方式
$ pip install apscheduler
# 源碼編譯方式
$ wget https://pypi.python.org/pypi/APScheduler/#downloads
$ python setup.py install

 

3、apschedule有四個主要的組件

1)trigger - 觸發器

2)job stores - 任務存儲(內存memory和持久化persistence)

3)executor - 執行器(實現是基於concurrent.futures的線程池或者進程池)

4)schedulers - 調度器(控制着其他的組件,最常用的是background方式和blocking方式)

先上一個例子

# -*- coding:utf-8 -*-
import redis
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED
class ScheduleFactory(object):
    def __init__(self):
        if not hasattr(ScheduleFactory, '__scheduler'):
            __scheduler = ScheduleFactory.get_instance()
        self.scheduler = __scheduler

    @staticmethod
    def get_instance():
        pool = redis.ConnectionPool(
            host='10.94.99.56',
            port=6379,
        )
        r = redis.StrictRedis(connection_pool=pool)
        jobstores = {
            'redis': RedisJobStore(2, r),
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }
        executors = {
            'default': ThreadPoolExecutor(max_workers=30),
            'processpool': ProcessPoolExecutor(max_workers=30)
        }
        job_defaults = {
            'coalesce': False,
            'max_instances': 3
        }
        scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
return scheduler

說明:上例中,scheduleFactory被實現為一個單例模式,保證new出的對象全局唯一

 

4、對scheduler的選擇

這里只給出兩個場景:

1)BackgroundScheduler:這種方式在創建scheduler的父進程退出后,任務同時停止調度。適用范圍:集成在服務中,例如django。

2)BlockingScheduler:這種方式會阻塞住創建shceduler的進程,適用范圍:該程序只干調度這一件事情。

選擇完調度器之后

1)scheduler.start() 啟動調度器

2)scheduler.shutdown() 停止調度器,調用該方法,調度器等到所有執行中的任務執行完成再退出,可以使用wait=False禁用

程序變為如下樣子

class ScheduleFactory(object):
    def __init__(self):
        if not hasattr(ScheduleFactory, '__scheduler'):
            __scheduler = ScheduleFactory.get_instance()
        self.scheduler = __scheduler

    @staticmethod
    def get_instance():
        pool = redis.ConnectionPool(
            host='10.94.99.56',
            port=6379,
        )
        r = redis.StrictRedis(connection_pool=pool)
        jobstores = {
            'redis': RedisJobStore(2, r),
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }
        executors = {
            'default': ThreadPoolExecutor(max_workers=30),
            'processpool': ProcessPoolExecutor(max_workers=30)
        }
        job_defaults = {
            'coalesce': False,
            'max_instances': 3
        }
        scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
        # scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
        return scheduler

    def start(self):
        self.scheduler.start()

    def shutdown(self):
        self.scheduler.shutdown()

 

5、對jobstores的選擇

大的方向有兩個:

1)非持久化

可選的stores:MemoryJobStrore

適用於你不會頻繁啟動和關閉調度器,而且對定時任務丟失批次不敏感。

2)持久化

可選的stores:SQLAlchemyJobStore, RedisJobStore,MongoDBJobStore,ZooKeeperJobStore

適用於你對定時任務丟失批次敏感的情況

jobStores初始化配置的方式是使用一個字典,例如

jobstores = {
            'redis': RedisJobStore(2, r),
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }

key是你配置store的名字,后面在添加任務的使用,可以指定對應的任務使用對應的store,例如這里選用的都是key=default的store。

def add_job(self, job_func, interval, id, job_func_params=None)
    self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)

 

6、executor的選擇

只說兩個,線程池和進程池。默認default是線程池方式。這個數是執行任務的實際並發數,如果你設置的小了而job添加的比較多,可能出現丟失調度的情況。

同時對於python多線程場景,如果是計算密集型任務,實際的並發度達不到配置的數量。所以這個數字要根據具體的要求設置。

一般來說我們設置並發為30,對一般的場景是沒有問題的。

executors = {
            'default': ThreadPoolExecutor(max_workers=30),
            'processpool': ProcessPoolExecutor(max_workers=30)
        }

同樣在add_job的時候,我們可以選擇對應的執行器

def add_job(self, job_func, interval, id, job_func_params=None)
    self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)

 

7、trigger的選擇

這是最簡單的一個了,有三種,不用配置

1、date - 每天的固定時間

2、interval - 間隔多長時間執行

3、cron - 正則

 

8、job的增刪改查接口api可以參看手冊

http://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s

 

9、問題fix

1)2017-07-24 14:06:28,480 [apscheduler.executors.default:120] [WARNING]- Run time of job "etl_func (trigger: interval[0:01:00], next run at: 2017-07-24 14:07:27 CST)" was missed by 0:00:01.245424

這個問題對應的源碼片段是

def run_job(job, jobstore_alias, run_times, logger_name):
    """
    Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
    scheduler.

    """
    events = []
    logger = logging.getLogger(logger_name)
    for run_time in run_times:
        # See if the job missed its run time window, and handle
        # possible misfires accordingly
        if job.misfire_grace_time is not None:
            difference = datetime.now(utc) - run_time
            grace_time = timedelta(seconds=job.misfire_grace_time)
            if difference > grace_time:
                events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,
                                                run_time))
                logger.warning('Run time of job "%s" was missed by %s', job, difference)
                continue

        logger.info('Running job "%s" (scheduled at %s)', job, run_time)
        try:
            retval = job.func(*job.args, **job.kwargs)
        except:
            exc, tb = sys.exc_info()[1:]
            formatted_tb = ''.join(format_tb(tb))
            events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
                                            exception=exc, traceback=formatted_tb))
            logger.exception('Job "%s" raised an exception', job)
        else:
            events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
                                            retval=retval))
            logger.info('Job "%s" executed successfully', job)

    return events

這里面有個參數是misfire_grace_time,默認是1s,如果任務的實際執行時間與任務調度時間的時間差>misfire_grace_time,就會warning並且跳過這次任務的調度!!!

為什么會發生這個問題?

1)executor並發度不夠,你添加的任務太多

2) misfire_grace_time,還是太小了

 

2)如果你使用的trigger=interval,並且設置了misfire_grace_time=30這種的話,如果你首次啟動的時間是10:50那么調度間隔和實際執行可能有1分鍾的誤差

怎么解決這個問題呢,你可以通過next_run_time設置首次調度的時間,讓這個時間取整分鍾。例如

def add_job(self, job_func, interval, id, job_func_params=None):
        next_minute = (datetime.now() + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M")
        next_run_time = datetime.strptime(next_minute, "%Y-%m-%d %H:%M")
        self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)

 

3)2017-07-25 11:02:00,003 [apscheduler.scheduler:962] [WARNING]- Execution of job "rule_func (trigger: interval[0:01:00], next run at: 2017-07-25 11:02:00 CST)" skipped: maximum number of running instances reached (1)

對應的源碼為

         for job in due_jobs:
                    # Look up the job's executor
                    try:
                        executor = self._lookup_executor(job.executor)
                    except:
                        self._logger.error(
                            'Executor lookup ("%s") failed for job "%s" -- removing it from the '
                            'job store', job.executor, job)
                        self.remove_job(job.id, jobstore_alias)
                        continue

                    run_times = job._get_run_times(now)
                    run_times = run_times[-1:] if run_times and job.coalesce else run_times
                    if run_times:
                        try:
                            executor.submit_job(job, run_times)
                        except MaxInstancesReachedError:
                            self._logger.warning(
                                'Execution of job "%s" skipped: maximum number of running '
                                'instances reached (%d)', job, job.max_instances)
                            event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
                                                       jobstore_alias, run_times)
                            events.append(event)
                       

submit_job的源碼

    with self._lock:
            if self._instances[job.id] >= job.max_instances:
                raise MaxInstancesReachedError(job)

            self._do_submit_job(job, run_times)
            self._instances[job.id] += 1
 
        

這是什么意思呢,當對一個job的一次調度的任務數>max_instances,會觸發這個異常,並終止調度。例如對一個批次的調度,比如job1,在10:00這次的調度,執行的時候發現有兩個任務被添加了。這怎么會發生呢?會。可能09:59分的調度沒有成功執行,但是持久化了下來,那么在10:00會嘗試再次執行。

max_instances默認是1,如果想讓這種異常放過的話,你可以設置max_instances大一些,比如max_instances=3

 

10、如果你想監控你的調度,那么apschedule提供了listener機制,可以監聽一些異常。只需要注冊監聽者就好

  def add_err_listener(self):
        self.scheduler.add_listener(err_listener, EVENT_JOB_MAX_INSTANCES|EVENT_JOB_MISSED|EVENT_JOB_ERROR)

def err_listener(ev):
    msg = ''
    if ev.code == EVENT_JOB_ERROR:
        msg = ev.traceback
    elif ev.code == EVENT_JOB_MISSED:
        msg = 'missed job, job_id:%s, schedule_run_time:%s' % (ev.job_id, ev.scheduled_run_time)
    elif ev.code == EVENT_JOB_MAX_INSTANCES:
        msg = 'reached maximum of running instances, job_id:%s' %(ev.job_id)
    rs = RobotSender()
    rs.send(
        "https://oapi.dingtalk.com/robot/send?access_token=499ca69a2b45402c00503acea611a6ae6a2f1bacb0ca4d33365595d768bb2a58",
        u"[apscheduler調度異常] 異常信息:%s" % (msg),
        '15210885002',
        False
    )

 

最后的代碼

# -*- coding:utf-8 -*-
import redis
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED
from alarmkits.send_robot import RobotSender


class ScheduleFactory(object):
    def __init__(self):
        if not hasattr(ScheduleFactory, '__scheduler'):
            __scheduler = ScheduleFactory.get_instance()
        self.scheduler = __scheduler

    @staticmethod
    def get_instance():
        pool = redis.ConnectionPool(
            host='10.94.99.56',
            port=6379,
        )
        r = redis.StrictRedis(connection_pool=pool)
        jobstores = {
            'redis': RedisJobStore(2, r),
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }
        executors = {
            'default': ThreadPoolExecutor(max_workers=30),
            'processpool': ProcessPoolExecutor(max_workers=30)
        }
        job_defaults = {
            'coalesce': False,
            'max_instances': 3
        }
        scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
        # scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False)
        return scheduler

    def start(self):
        self.scheduler.start()

    def shutdown(self):
        self.scheduler.shutdown()

    def add_job(self, job_func, interval, id, job_func_params=None):
        next_minute = (datetime.now() + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M")
        next_run_time = datetime.strptime(next_minute, "%Y-%m-%d %H:%M")
        self.scheduler.add_job(
                job_func,
                jobstore='default',
                trigger='interval',
                seconds=interval,
                id=id,
                kwargs=job_func_params,
                executor='default',
                next_run_time=next_run_time,
                misfire_grace_time=30,
                max_instances=3
        )

    def remove_job(self, id):
        self.scheduler.remove_job(id)

    def modify_job(self, id, interval):
        self.scheduler.modify_job(job_id=id, seconds=interval)

    def add_err_listener(self):
        self.scheduler.add_listener(err_listener, EVENT_JOB_MAX_INSTANCES|EVENT_JOB_MISSED|EVENT_JOB_ERROR)

def err_listener(ev):
    msg = ''
    if ev.code == EVENT_JOB_ERROR:
        msg = ev.traceback
    elif ev.code == EVENT_JOB_MISSED:
        msg = 'missed job, job_id:%s, schedule_run_time:%s' % (ev.job_id, ev.scheduled_run_time)
    elif ev.code == EVENT_JOB_MAX_INSTANCES:
        msg = 'reached maximum of running instances, job_id:%s' %(ev.job_id)
    rs = RobotSender()
    rs.send(
        "https://oapi.dingtalk.com/robot/send?access_token=499ca69a2b45402c00503acea611a6ae6a2f1bacb0ca4d33365595d768bb2a58",
        u"[apscheduler調度異常] 異常信息:%s" % (msg),
        '15210885002',
        False
    )

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM