【python小隨筆】celery異步任務與調用返回值


PS:設置完異步任務后,如果出現文件名與依賴包沖突,那一定是你直接運行文件了,最好是在接口里面執行異步任務規避掉這個問題

s1.py(配置任務文件)

from celery import Celery
import time


my_task = Celery("tasks", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379")

# 為應用創建任務,func1
@my_task.task(name="Celery.celery.s1.func2")  # 指定任務路徑(坑)
def func2(x, y):
    time.sleep(3)
    print("2222222222222222222")
    return x+y

 s2.py(執行異步文件)

from celery.result import AsyncResult

from Celery.celery.s1 import func2, my_task
import time

if __name__ == '__main__':
    # 將任務交給Celery的Worker執行
    res = func2.delay(2 ,3)
    # 異步獲取任務返回值
    for i in range(100):
        time.sleep(1)
        async_task = AsyncResult(id=res.id ,app=my_task)
        print("async_task.id" ,async_task.id)
        # 判斷異步任務是否執行成功
        if async_task.successful():
            # 獲取異步任務的返回值
            result = async_task.get()
            print(result)
            print("執行成功")
            break
        else:
            print("任務還未執行完成")

##### 以下是相關參數

@celery.task(bind=True, name='name')
def function_name():
    pass

# task方法參數
name:可以顯式指定任務的名字;默認是模塊的命名空間中本函數的名字。
serializer:指定本任務的序列化的方法;
bind:一個bool值,設置是否綁定一個task的實例,如果綁定,task實例會作為參數傳遞到任務方法中,可以訪問task實例的所有的屬性,即前面反序列化中那些屬性
base:定義任務的基類,可以以此來定義回調函數,默認是Task類,我們也可以定義自己的Task類
default_retry_delay:設置該任務重試的延遲時間,當任務執行失敗后,會自動重試,單位是秒,默認3分鍾;
autoretry_for:設置在特定異常時重試任務,默認False即不重試;
retry_backoff:默認False,設置重試時的延遲時間間隔策略;
retry_backoff_max:設置最大延遲重試時間,默認10分鍾,如果失敗則不再重試;
retry_jitter:默認True,即引入抖動,避免重試任務集中執行;
# 當bind=True時,add函數第一個參數是self,指的是task實例
@task(bind=True)  # 第一個參數是self,使用self.request訪問相關的屬性
def add(self, x, y):
    try:
        logger.info(self.request.id)
    except:
        self.retry() # 當任務失敗則進行重試
import celery

class MyTask(celery.Task):
    # 任務失敗時執行
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))
    # 任務成功時執行
    def on_success(self, retval, task_id, args, kwargs):
        pass
    # 任務重試時執行
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        pass

@task(base=MyTask)
def add(x, y):
    raise KeyError()

#方法相關的參數
exc:失敗時的錯誤的類型;
task_id:任務的id;
args:任務函數的參數;
kwargs:鍵值對參數;
einfo:失敗或重試時的異常詳細信息;
retval:任務成功執行的返回值;

4:TASK的一般屬性:

 

Task.name:任務名稱;
Task.request:當前任務的信息;
Task.max_retries:設置重試的最大次數
Task.throws:預期錯誤類的可選元組,不應被視為實際錯誤,而是結果失敗;
Task.rate_limit:設置此任務類型的速率限制
Task.time_limit:此任務的硬限時(以秒為單位)。
Task.ignore_result:不存儲任務狀態。默認False;
Task.store_errors_even_if_ignored:如果True,即使任務配置為忽略結果,也會存儲錯誤。
Task.serializer:標識要使用的默認序列化方法的字符串。
Task.compression:標識要使用的默認壓縮方案的字符串。默認為task_compression設置。
Task.backend:指定該任務的結果存儲后端用於此任務。
Task.acks_late:如果設置True為此任務的消息將在任務執行后確認 ,而不是在執行任務之前(默認行為),即默認任務執行之前就會發送確認;
Task.track_started:如果True任務在工作人員執行任務時將其狀態報告為“已啟動”。默認是False;

 

獲取任務結果和狀態:

r = task.apply_async()
r.ready()     # 查看任務狀態,返回布爾值,  任務執行完成, 返回 True, 否則返回 False.
r.wait()      # 會阻塞等待任務完成, 返回任務執行結果,很少使用;
r.get(timeout=1)       # 獲取任務執行結果,可以設置等待時間,如果超時但任務未完成返回None;
r.result      # 任務執行結果,未完成返回None;
r.state       # PENDING, START, SUCCESS,任務當前的狀態
r.status      # PENDING, START, SUCCESS,任務當前的狀態
r.successful  # 任務成功返回true
r.traceback  # 如果任務拋出了一個異常,可以獲取原始的回溯信息

自定義發布者,交換機,路由鍵, 隊列, 優先級,序列方案和壓縮方法:

task.apply_async((2,2), 
    compression='zlib',
    serialize='json',
    queue='priority.high',
    routing_key='web.add',
    priority=0,
    exchange='web_exchange')


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM