python定時任務模塊-crontab


python定時任務模塊-crontab

特點:使用linux系統自帶的定時管理模塊,簡單配置,靈活使用,輕松上手,可以解決開發過程中的定時需求問題.

使用:

安裝:pip install python-crontab
導入:from crontab import CronTab
實例化對象:cron = CronTab(user=True)
此時的crontab用戶是當前用戶,定時任務的增刪改操作都是基於當前用戶的.
新增定時任務:
              job = cron.new(command=command_line)
              # 設置任務執行周期
              job.setall(time_str)
              # 給任務添加一個標識,給任務設置comment,這樣就可以根據comment查詢
              job.set_comment(comment_name)
              # 將crontab寫入配置文件
              cron.write_to_user() # 指定用戶,寫入指定用戶下的crontab任務
刪除定時任務:
            # 按照名字進行指定定時任務刪除
            cron.remove_all(comment=comment_name)
完整例子:
          class Crontab_Update(object):

            def __init__(self):
                # 創建當前用戶的crontab,當然也可以創建其他用戶的,但得有足夠權限
                self.cron = CronTab(user=True)

            def add_crontab_job(self, command_line, time_str, comment_name):
                # 創建任務
                job = self.cron.new(command=command_line)
                # 設置任務執行周期
                job.setall(time_str)
                # 給任務添加一個標識,給任務設置comment,這樣就可以根據comment查詢
                job.set_comment(comment_name)
                # 將crontab寫入配置文件
                self.cron.write_to_user() # 指定用戶,寫入指定用戶下的crontab任務

            def del_crontab_jobs(self, comment_name):
                # 根據comment查詢,當時返回值是一個生成器對象,
                # 不能直接根據返回值判斷任務是否存在,
                # 如果只是判斷任務是否存在,可直接遍歷my_user_cron.crons
                # 返回所有的定時任務,返回的是一個列表
                # 按comment清除定時任務
                # self.cron.remove_all(comment=comment_name)
                # 按comment清除多個定時任務,一次write即可
                self.cron.remove_all(comment=comment_name)
                self.cron.remove_all(comment=comment_name+ ' =')
                self.cron.write_to_user() # 指定用戶,刪除指定用戶下的crontab任務

            def del_all_crontab_jobs(self):
                # 指定用戶,刪除指定用戶下所有的crontab任務
                self.cron.remove_all()
                self.cron.write_to_user()


      def get_period(period):
          ret = None
          if period == '5min':
              ret = '*/5 * * * *'
          elif period == 'hour':
              ret = '0 * * * *'
          elif period == 'day':
              ret = '0 8 * * *'

          return ret

      def update_crontab(clu_nid, add_nid, period, del_nid):
          command_line = EXEC_ENV + EXEC_FILE + clu_nid

          # 創建一個實例
          crontab_update = Crontab_Update()
          # 調用函數新增一個crontab任務
          try:
              if del_nid == '*':
                  crontab_update.del_all_crontab_jobs()
              elif del_nid:
                  crontab_update.del_crontab_jobs(del_nid)
              if add_nid:
                  period_time = get_period(period)
                  if not period_time:
                      return None
                  crontab_update.add_crontab_job(command_line, period_time, add_nid)

          except Exception as err:
              logger.error(err)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM