celery使用的一些小坑和技巧(非從無到有的過程)


純粹是記錄一下自己在剛開始使用的時候遇到的一些坑,以及自己是怎樣通過配合redis來解決問題的。
文章分為三個部分,一是怎樣跑起來,並且怎樣監控相關的隊列和任務;二是遇到的幾個坑;三是給一些自己配合redis使用的代碼示例。

一.celery使用:
  Ⅰ.把任務中間件服務器跑起來,rabbitmq-server
    跑起來以后,就能在瀏覽器(http://localhost:15672/#/queues)里面看中間件里面的相關內容了。
    (如果想把這邊的某些隊列下面的沒有跑完的任務丟棄掉的話,進對應的隊列,點擊delete)
  Ⅱ.在終端里面對應路徑下面把celery的任務隊列跑起來:
    eg:celery -A celery_autowork_task worker --loglevel=info -Q auto_work
    注意里面的各個參數的意義
  Ⅲ.看具體的每個任務的執行情況,是否成功之類的
    1.安裝flower,pip3 install flower
    2.用celery把flower跑起來celery flower
    3.http://localhost:5555在這個鏈接,或者終端里面看見的就能看具體的每個任務的執行情況
二.坑: 
  Ⅰ.異常場景:並發數設置成4,然后第一次進去隊列的時候是用的第一個,retry的時候,居然在4和2里面都觸發了任務。也就是關了一個,開了兩個。
  異常原因:在exception的代碼段里面加了retry功能, 然后在try的代碼段里面也加了try功能,但是查看retry函數本身的定義,發現它的類型就是exception;也就是調用這個本身就是一個異常會走到處理異常的代碼段里面去;所以更好的方式是只在exception的代碼段里面retry,然后如果try里面需要retry的話,直接用raise 搞一個異常出來;
  Ⅱ.異常場景:在task裝飾器里面定義重試間隔時間default_retry_delay,或者在retry里面指定countdown為5秒,但是在終端里面看見的一個任務的兩次開始執行時間發現都會大於5秒。開始還以為是不是兩個設置會互相影響。
  異常原因:后來發現的default_retry_dalay是在retry的時候沒有傳參數的的默認重試間隔時間,可以看模塊代碼http://docs.celeryproject.org/en/latest/_modules/celery/app/task.html?highlight=task;但之所以一個任務的兩次開始執行時間大於5秒是因為,我們的retry一般都寫在exception的代碼塊里面,而這個間隔時間,其實指的是,執行到retry的時間往后推5秒。try里面的操作會有耗時,所以導致了兩次任務的開始時間大於5秒。那么如果想要兩次的開始時間為5秒的話,可以在進入任務的時候進行一下計時,然后任務重試的時候看整個try的代碼塊的耗時。
  Ⅲ.異常場景:在task裝飾器里面提供了max_retries最大重試次數,然后到達最大重試次數(比如10次)以后,在第十次的重試里面仍然會走到retry方法,這個時候,居然在終端開間異常拋出了。
  異常原因:按我的個人理解,這種設定了10次重試次數,但是在第十次開啟第11次的重試時候,應該模塊是自動不讓重試才對的。可是貌似沒有完成,或者可能是我沒有搜索到正確的使用方法;解決辦法就是自己在重試之前看當前的重試次數,自己控制
  Ⅳ.異常場景:如果一次任務在第一次的發起,重試次數還沒到的時候想要第二次發起怎么辦?就是一般來說還是只允許一個實際任務只有一個隊列里面的任務處理它
  處理辦法:不管是delay還是apply_async方法都是有返回值的,這個返回值的是celery里面的一個類,可以string以后得到任務的ID,然后之后用app.control里面的revoke方法把對應的任務中斷掉。可以查看對應模塊的源代碼http://docs.celeryproject.org/en/latest/_modules/celery/app/control.html
三.代碼示例
  celery_autowork_task.py【任務執行方法的定義的地方】

from celery import Celery
AutoWork = Celery('auto_work', broker = 【CELERY_BROKER】, backend = '')
AutoWork.conf.CELERY_TIMEZONE              = 'Asia/Shanghai'  # 時區
AutoWork.conf.CELERYD_CONCURRENCY          = 4                # 任務並發數
AutoWork.conf.CELERYD_TASK_SOFT_TIME_LIMIT = 300              # 任務超時時間
AutoWork.conf.CELERY_DISABLE_RATE_LIMITS   = True             # 任務頻率限制開關

AutoWork.conf.CELERY_ROUTES = {                               # 任務調度隊列
    "autowork_check_barcode_recharge":{"queue":"auto_work"},
}

@AutoWork.task(bind=True,name="autowork_check_barcode_recharge",max_retries=15)
def autowork_check_barcode_recharge(self,recharge_id):
    time_begin=datetime.datetime.now()
    try:
        pass
    except Exception as exc:
        retries=self.request.retries
        if retries<self.max_retries:
            delta_second=(datetime.datetime.now()-time_begin).seconds
            if delta_second<5:
                return self.retry(exc = exc,countdown=5-delta_second)
            else:
                return self.retry(exc = exc,countdown=0)
    finally:
        pass

 

  test.py【調用任務以及檢查任務的執行情況】

def cancel_pre_celery_task_and_excute_next_task(self,recharge_record):
        recharge_work_guid="recharge_work_guid:%d:%s"%(recharge_record.shop_id,recharge_record.num)
        from handlers.celery_autowork_task import autowork_check_barcode_recharge
        if redis.get(recharge_work_guid):  #上次是否有執行這個實際業務中的任務
            from celery_autowork_task import AutoWork
            AutoWork.control.revoke(redis.get(recharge_work_guid).decode('utf-8'),terminate=True) #如果有執行,則中斷
        autowork_guid=autowork_check_barcode_recharge.delay(recharge_record.id) #得到本次的任務GUID
        redis.set(recharge_work_guid,str(autowork_guid)) #保存到redis里面去
        redis.expire(recharge_work_guid,3600)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM