PS:設置完異步任務后,如果出現文件名與依賴包沖突,那一定是你直接運行文件了,最好是在接口里面執行異步任務規避掉這個問題
s1.py(配置任務文件)
from celery import Celery import time my_task = Celery("tasks", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379") # 為應用創建任務,func1 @my_task.task(name="Celery.celery.s1.func2") # 指定任務路徑(坑) def func2(x, y): time.sleep(3) print("2222222222222222222") return x+y
s2.py(執行異步文件)
from celery.result import AsyncResult from Celery.celery.s1 import func2, my_task import time if __name__ == '__main__': # 將任務交給Celery的Worker執行 res = func2.delay(2 ,3) # 異步獲取任務返回值 for i in range(100): time.sleep(1) async_task = AsyncResult(id=res.id ,app=my_task) print("async_task.id" ,async_task.id) # 判斷異步任務是否執行成功 if async_task.successful(): # 獲取異步任務的返回值 result = async_task.get() print(result) print("執行成功") break else: print("任務還未執行完成")
##### 以下是相關參數
@celery.task(bind=True, name='name') def function_name(): pass # task方法參數 name:可以顯式指定任務的名字;默認是模塊的命名空間中本函數的名字。 serializer:指定本任務的序列化的方法; bind:一個bool值,設置是否綁定一個task的實例,如果綁定,task實例會作為參數傳遞到任務方法中,可以訪問task實例的所有的屬性,即前面反序列化中那些屬性 base:定義任務的基類,可以以此來定義回調函數,默認是Task類,我們也可以定義自己的Task類 default_retry_delay:設置該任務重試的延遲時間,當任務執行失敗后,會自動重試,單位是秒,默認3分鍾; autoretry_for:設置在特定異常時重試任務,默認False即不重試; retry_backoff:默認False,設置重試時的延遲時間間隔策略; retry_backoff_max:設置最大延遲重試時間,默認10分鍾,如果失敗則不再重試; retry_jitter:默認True,即引入抖動,避免重試任務集中執行;
# 當bind=True時,add函數第一個參數是self,指的是task實例 @task(bind=True) # 第一個參數是self,使用self.request訪問相關的屬性 def add(self, x, y): try: logger.info(self.request.id) except: self.retry() # 當任務失敗則進行重試
import celery class MyTask(celery.Task): # 任務失敗時執行 def on_failure(self, exc, task_id, args, kwargs, einfo): print('{0!r} failed: {1!r}'.format(task_id, exc)) # 任務成功時執行 def on_success(self, retval, task_id, args, kwargs): pass # 任務重試時執行 def on_retry(self, exc, task_id, args, kwargs, einfo): pass @task(base=MyTask) def add(x, y): raise KeyError() #方法相關的參數 exc:失敗時的錯誤的類型; task_id:任務的id; args:任務函數的參數; kwargs:鍵值對參數; einfo:失敗或重試時的異常詳細信息; retval:任務成功執行的返回值;
4:TASK的一般屬性:
Task.name:任務名稱;
Task.request:當前任務的信息;
Task.max_retries:設置重試的最大次數
Task.throws:預期錯誤類的可選元組,不應被視為實際錯誤,而是結果失敗;
Task.rate_limit:設置此任務類型的速率限制
Task.time_limit:此任務的硬限時(以秒為單位)。
Task.ignore_result:不存儲任務狀態。默認False;
Task.store_errors_even_if_ignored:如果True,即使任務配置為忽略結果,也會存儲錯誤。
Task.serializer:標識要使用的默認序列化方法的字符串。
Task.compression:標識要使用的默認壓縮方案的字符串。默認為task_compression設置。
Task.backend:指定該任務的結果存儲后端用於此任務。
Task.acks_late:如果設置True為此任務的消息將在任務執行后確認 ,而不是在執行任務之前(默認行為),即默認任務執行之前就會發送確認;
Task.track_started:如果True任務在工作人員執行任務時將其狀態報告為“已啟動”。默認是False;
獲取任務結果和狀態:
r = task.apply_async() r.ready() # 查看任務狀態,返回布爾值, 任務執行完成, 返回 True, 否則返回 False. r.wait() # 會阻塞等待任務完成, 返回任務執行結果,很少使用; r.get(timeout=1) # 獲取任務執行結果,可以設置等待時間,如果超時但任務未完成返回None; r.result # 任務執行結果,未完成返回None; r.state # PENDING, START, SUCCESS,任務當前的狀態 r.status # PENDING, START, SUCCESS,任務當前的狀態 r.successful # 任務成功返回true r.traceback # 如果任務拋出了一個異常,可以獲取原始的回溯信息
自定義發布者,交換機,路由鍵, 隊列, 優先級,序列方案和壓縮方法:
task.apply_async((2,2), compression='zlib', serialize='json', queue='priority.high', routing_key='web.add', priority=0, exchange='web_exchange')
