celery--調用異步任務的三種方法和task參數


調用異步任務的三種方法

第一種

調用異步任務有三種方法,前面我們使用的是task.delay(),這是apply_async方法的別名,但接受的參數較為簡單

第二種

我們常用的是task.apply_async(args=[arg1,args],kwargs={key:value}):可以接受復雜的參數

這種可以接收的參數有:

  • task_id:為任務分配唯一id,默認是uuid
  • countdown:設置該任務等待一段時間在執行,單位為秒
  • eta:定義任務的開始時間,eta=time.time()+5,單位為秒,是UTC時間,設置成國內時間也沒有用
  • expires:設置任務過期時間,任務在過期時間后還沒有執行則被丟棄,單位為秒
  • retry:如果任務失敗后,是否重試,默認為True
  • shadow:重新指定任務的名字,覆蓋其在日志中使用的任務名稱
  • retry_policy:{} 重試策略,max_retries:最大重試次數,默認為3次。interval_start:重試等待的時間間隔,默認為0。interval_step:每次重試讓重試間隔增加的秒數,默認為0.2秒。interval_max:重試間隔最大的秒數,既通過interval_step增大到多少秒之后,就不在增加了,默認為0.2秒。
  • routing_key:自定義路由鍵
  • queue:指定發送到哪個隊列
  • exchange:指定發送到哪個交換機
  • priority:任務隊列的優先級,0到255之間,對於rabbitmq來說0是最高優先級
  • headers:為任務添加額外的消息

還是使用前面的例子,使用task1和task2兩個任務,demo.py調用

在demo.py里更改調用方式

from apps.task1 import add
from apps.task2 import subs

if __name__ == '__main__':
    add.delay(3,5)
    subs.apply_async(args=[55,22],
                     task_id='aaaaa2222',
                     countdown=5,
                     shadow = 'zouzou'
                     )

執行結果

 第三種

app.send_task(task1.add,args=[1,2])

不建議用,因為不會校驗是否存在這個方法,直接就發送成功里,celery執行就會報錯

task參數

task常用參數
  • name:可以顯示指定任務的名字,默認是本函數的名字,也就是上面的 shadow
  • bind:一個bool值,設置是否綁定一個task的實例,如果綁定,task實例會作為參數傳遞到任務方法中(第一個參數為self),可以訪問task實例的所有屬性。
  • base:定義任務的基類,可以以此來定義回調函數,默認是Task類,我們也可以定義自己的Task類
  • default_retry_delay:設置該任務重試的延遲時間,當任務執行失敗后,會自動重試,單位是秒,默認為3分鍾
task不常用參數
  • serializer:指定本任務的序列化的方法
  • autoretry_for:設置在特定異常時重試任務,默認False不重試。
  • retry_backoff:默認Flase,設置重試時的延遲時間間隔策略
  • retry_backoff_max:設置最大延遲重試時間,默認10分鍾,如果失敗則不在重試
  • retry_jitter:默認為True,既引入抖動,避免重試任務集中執行
Task的一般屬性
  • Task.name:任務名稱
  • Task.request:當前任務的信息
  • Task.max_retries:設置重試的最大次數
  • Task.rhrows:預期錯誤類的可選元組
  • Task.rate_limit:設置任務類型的速度限制
  • Task.time_limit:此任務的硬限時,單位為秒
  • Task.serializer:標識要使用的默認序列化方法的字符串

修改task1.py的內容如下

from  apps import app
import celery

celery.Task   # Task的屬性在這里面

class BaseTask(celery.Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('執行task失敗')

    def on_success(self, retval, task_id, args, kwargs):
        print(f'執行task成功,task id為:{task_id}')

@app.task(name='wahaha',bind=True,base=BaseTask)
def add(self,x,y):
    print(self.request.id)  # task_id
    return x+y

啟動celery worker

執行demo.py

from apps.task1 import add
from apps.task2 import subs

if __name__ == '__main__':
    add.delay(3,5)
    subs.apply_async(args=[55,33])


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM