調用異步任務的三種方法
第一種
調用異步任務有三種方法,前面我們使用的是task.delay(),這是apply_async方法的別名,但接受的參數較為簡單
第二種
我們常用的是task.apply_async(args=[arg1,args],kwargs={key:value}):可以接受復雜的參數
這種可以接收的參數有:
- task_id:為任務分配唯一id,默認是uuid
- countdown:設置該任務等待一段時間在執行,單位為秒
- eta:定義任務的開始時間,eta=time.time()+5,單位為秒,是UTC時間,設置成國內時間也沒有用
- expires:設置任務過期時間,任務在過期時間后還沒有執行則被丟棄,單位為秒
- retry:如果任務失敗后,是否重試,默認為True
- shadow:重新指定任務的名字,覆蓋其在日志中使用的任務名稱
- retry_policy:{} 重試策略,max_retries:最大重試次數,默認為3次。interval_start:重試等待的時間間隔,默認為0。interval_step:每次重試讓重試間隔增加的秒數,默認為0.2秒。interval_max:重試間隔最大的秒數,既通過interval_step增大到多少秒之后,就不在增加了,默認為0.2秒。
- routing_key:自定義路由鍵
- queue:指定發送到哪個隊列
- exchange:指定發送到哪個交換機
- priority:任務隊列的優先級,0到255之間,對於rabbitmq來說0是最高優先級
- headers:為任務添加額外的消息
還是使用前面的例子,使用task1和task2兩個任務,demo.py調用
在demo.py里更改調用方式
from apps.task1 import add from apps.task2 import subs if __name__ == '__main__': add.delay(3,5) subs.apply_async(args=[55,22], task_id='aaaaa2222', countdown=5, shadow = 'zouzou' )
執行結果
第三種
app.send_task(task1.add,args=[1,2])
不建議用,因為不會校驗是否存在這個方法,直接就發送成功里,celery執行就會報錯
task參數
task常用參數
- name:可以顯示指定任務的名字,默認是本函數的名字,也就是上面的 shadow
- bind:一個bool值,設置是否綁定一個task的實例,如果綁定,task實例會作為參數傳遞到任務方法中(第一個參數為self),可以訪問task實例的所有屬性。
- base:定義任務的基類,可以以此來定義回調函數,默認是Task類,我們也可以定義自己的Task類
- default_retry_delay:設置該任務重試的延遲時間,當任務執行失敗后,會自動重試,單位是秒,默認為3分鍾
task不常用參數
- serializer:指定本任務的序列化的方法
- autoretry_for:設置在特定異常時重試任務,默認False不重試。
- retry_backoff:默認Flase,設置重試時的延遲時間間隔策略
- retry_backoff_max:設置最大延遲重試時間,默認10分鍾,如果失敗則不在重試
- retry_jitter:默認為True,既引入抖動,避免重試任務集中執行
Task的一般屬性
- Task.name:任務名稱
- Task.request:當前任務的信息
- Task.max_retries:設置重試的最大次數
- Task.rhrows:預期錯誤類的可選元組
- Task.rate_limit:設置任務類型的速度限制
- Task.time_limit:此任務的硬限時,單位為秒
- Task.serializer:標識要使用的默認序列化方法的字符串
修改task1.py的內容如下
from apps import app import celery celery.Task # Task的屬性在這里面 class BaseTask(celery.Task): def on_failure(self, exc, task_id, args, kwargs, einfo): print('執行task失敗') def on_success(self, retval, task_id, args, kwargs): print(f'執行task成功,task id為:{task_id}') @app.task(name='wahaha',bind=True,base=BaseTask) def add(self,x,y): print(self.request.id) # task_id return x+y
啟動celery worker
執行demo.py
from apps.task1 import add from apps.task2 import subs if __name__ == '__main__': add.delay(3,5) subs.apply_async(args=[55,33])