django apscheduler 定時任務(下篇)


參數

  • scheduler: 指定調度器
  • trigger: 任務執行的方式,共有三種:'date'、'interval'、'cron'。
    • 'date' + 'run_date' 的參數組合, 能實現單次任務。 例子:2019-07-07 22:49:00 執行任務 @register_job(scheduler, 'date', id='test', run_date='2019-07-07 22:49:00') 注:在親測時,執行完任務會報錯,原因時執行完任務后會去mysql中刪除djangojob表中的任務。但是djangojobexecution表記錄着執行結果,有外鍵關聯着djangojob表,所以刪除時顯示有外鍵約束錯誤。但是任務會正常執行,執行之后也會正常刪除。
    • 'interval' + 'hours' + 'minutes' + ..... 的參數組合,能實現間隔性任務。 例子:每隔3個半小時執行任務 還有seconds,days參數可以選擇 注:如果任務需要執行10秒,而間隔設置為1秒,它是不會給你開10個線程同時去執行10個任務的。它會錯過其他任務直到當前任務完成。
    • @register_job(scheduler, 'interval', id='test', hours=3, minutes=30)
    • 'cron' + 'hour' + 'minute'+...的參數組合,能實現cron類的任務。 例子:每天的8點半執行任務 還有day,second,month等參數可以選擇。
    • @register_job(scheduler, 'cron', id='test', hour=8, minute=30)
  • id: 任務的名字,不傳的話會自動生成。不過為了之后對任務進行暫停、開啟、刪除等操作,建議給一個名字。並且是唯一的,如果多個任務取一個名字,之前的任務就會被覆蓋。
  • args: list類型。執行代碼所需要的參數。
  • next_run_time:datetime類型。開始執行時間。如果你現在創建一個定時任務,想3天后凌晨三點半自動給你女朋友發微信,那就需要這個參數了。

還有些其他的參數感興趣的同學可以查看源代碼來了解。

 

 

    def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
                misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
                next_run_time=undefined, jobstore='default', executor='default',
                replace_existing=False, **trigger_args):
        """
        add_job(func, trigger=None, args=None, kwargs=None, id=None, \
            name=None, misfire_grace_time=undefined, coalesce=undefined, \
            max_instances=undefined, next_run_time=undefined, \
            jobstore='default', executor='default', \
            replace_existing=False, **trigger_args)

        Adds the given job to the job list and wakes up the scheduler if it's already running.

        Any option that defaults to ``undefined`` will be replaced with the corresponding default
        value when the job is scheduled (which happens when the scheduler is started, or
        immediately if the scheduler is already running).

        The ``func`` argument can be given either as a callable object or a textual reference in
        the ``package.module:some.object`` format, where the first half (separated by ``:``) is an
        importable module and the second half is a reference to the callable object, relative to
        the module.

        The ``trigger`` argument can either be:
          #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case
            any extra keyword arguments to this method are passed on to the trigger's constructor
          #. an instance of a trigger class

        :param func: callable (or a textual reference to one) to run at the given time
        :param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
            ``func`` is called
        :param list|tuple args: list of positional arguments to call func with
        :param dict kwargs: dict of keyword arguments to call func with
        :param str|unicode id: explicit identifier for the job (for modifying it later)
        :param str|unicode name: textual description of the job
        :param int misfire_grace_time: seconds after the designated runtime that the job is still
            allowed to be run (or ``None`` to allow the job to run no matter how late it is)
        :param bool coalesce: run once instead of many times if the scheduler determines that the
            job should be run more than once in succession
        :param int max_instances: maximum number of concurrently running instances allowed for this
            job
        :param datetime next_run_time: when to first run the job, regardless of the trigger (pass
            ``None`` to add the job as paused)
        :param str|unicode jobstore: alias of the job store to store the job in
        :param str|unicode executor: alias of the executor to run the job with
        :param bool replace_existing: ``True`` to replace an existing job with the same ``id``
            (but retain the number of runs from the existing one)
        :rtype: Job

        """
        job_kwargs = {
            'trigger': self._create_trigger(trigger, trigger_args),
            'executor': executor,
            'func': func,
            'args': tuple(args) if args is not None else (),
            'kwargs': dict(kwargs) if kwargs is not None else {},
            'id': id,
            'name': name,
            'misfire_grace_time': misfire_grace_time,
            'coalesce': coalesce,
            'max_instances': max_instances,
            'next_run_time': next_run_time
        }
        job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if
                          value is not undefined)
        job = Job(self, **job_kwargs)

        # Don't really add jobs to job stores before the scheduler is up and running
        with self._jobstores_lock:
            if self.state == STATE_STOPPED:
                self._pending_jobs.append((job, jobstore, replace_existing))
                self._logger.info('Adding job tentatively -- it will be properly scheduled when '
                                  'the scheduler starts')
            else:
                self._real_add_job(job, jobstore, replace_existing)

        return job

  

如何頁面設定,定時任務時間

 

 通常我們希望頁面配置所以一般使用add_job進行定時任務創建和管理;

from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
scheduler = BackgroundScheduler()  # 創建一個調度器對象
scheduler.add_jobstore(DjangoJobStore(), "default") # 添加一個作業

if not scheduler.state:
scheduler.start()

  

 
class ScheduleView(CustomViewSet):
   lookup_field = 'id'
   queryset = ScheduleTrigger.objects.all()
   serializer_class = ScheduleSerializer
   parser_classes =  [JSONParser, FormParser]

   def destroy(self, request, *args, **kwargs):
       instance = self.get_object()
       job_id = str(instance.id)
       if scheduler.get_job(job_id=job_id) :
           scheduler.remove_job(job_id=job_id)
       self.perform_destroy(instance)
       return CustomResponse(data=[],status=200,msg="刪除ok",success=True)


   def create(self, request, *args, **kwargs):
       """  cron tigger need rewrite ,default */10"""
       serializer = self.get_serializer(data=request.data)
       serializer.is_valid(raise_exception=True)
       self.perform_create(serializer)
       try:

           job_id =serializer.data.get('id')
           if serializer.data.get("enable"):
               if serializer.data.get('schedule_type') == '1':
                   scheduler.add_job(test,
                                     trigger=DateTrigger(run_date=serializer.data.get('schedule_time')),
                                     id=str(job_id),
                                     max_instances=1, replace_existing=True, args=serializer.data.get('schedule_args'))
               if serializer.data.get('schedule_type') == '2':
                   scheduler.add_job(test,
                                     trigger=IntervalTrigger(seconds=int(serializer.data.get('schedule_time'))),
                                     id=str(job_id),
                                     max_instances=1, replace_existing=True, args=serializer.data.get('schedule_args'))

               if serializer.data.get('schedule_type') == '3':
                    scheduler.add_job(test,
                                 trigger=CronTrigger(second="*/10"),
                                 id=str(job_id),
                                 max_instances=1, replace_existing=True, args=serializer.data.get('schedule_args'))
               register_job(scheduler)


       except Exception:
           pass
       headers = self.get_success_headers(serializer.data)
       return CustomResponse(data=serializer.data, code=200,
                             msg='ok', success=True,headers=headers)



   def patch(self, request, *args, **kwargs):
       """  cron tigger need rewrite ,default */10"""
       instance = self.get_object()
       job_id =  instance.id
       enable = request.data.get("enable")
       if scheduler.get_job(job_id=str(job_id)):
           if not enable:
                scheduler.pause_job(job_id=str(job_id))
           if enable:
               scheduler.resume_job(job_id=str(job_id))
       else:
           if enable:
               if instance.schedule_type =='1':
                   scheduler.add_job(test,
                                     trigger=DateTrigger(run_date=instance.schedule_time),
                                     id=str(job_id),
                                     max_instances=1, replace_existing=True, args=instance.schedule_args)
               if instance.schedule_type == '2':
                   scheduler.add_job(test,
                                     trigger=IntervalTrigger(seconds=int(instance.schedule_time)),
                                     id=str(job_id),
                                     max_instances=1, replace_existing=True, args=instance.schedule_args)
               if instance.schedule_type == '3':

                   scheduler.add_job(test,
                                 trigger=CronTrigger(second="*/10"),
                                 id=str(job_id),
                                 max_instances=1, replace_existing=True, args=instance.schedule_args)
               register_job(scheduler)


       kwargs['partial'] = True
       response =  self.update(request, *args, **kwargs)
       return CustomResponse(data=response.data, code=200,
                             msg='ok', success=True)

  

 
 

其他功能

django-apscheduler框架還提供了很多操作定時任務的函數。比如:

add_job  創建任務

  • 刪除任務 scheduler.remove_job(job_name)
  • 暫停任務 scheduler.pause_job(job_name)
  • 重新開啟任務 scheduler.resume_job(job_name)
  • 修改任務 scheduler.modify_job(job_name) 注:修改任務只能修改參數,如果要修改執行時間的話,就把任務刪了重新創建。

可以在頁面上做一個這樣的表格,再加上簡單的前后端交互就可以讓用戶自行管理定時任務:

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM