使用celery的backend異步獲取結果


慣例先貼出相關參考的文檔:

http://docs.celeryproject.org/en/stable/getting-started/next-steps.html

http://docs.celeryproject.org/en/stable/userguide/tasks.html#task-result-backends

 

這篇緊接上篇。

其實我們一般對這種異步任務需求是可能需要回調的。比如說我現在有一個支付的異步任務發送到了隊列。

生產者不需要等待,在發送到隊列之后就告訴用戶已經支付成功了或者說正在支付稍后再來查看狀態。

這個時候celery的worker獲取到這個任務之后開始處理,等一會兒他處理完畢之后可能需要將這個處理完畢的結果返回給發送給他的生產者。

那么問題來了,如何告知生產者?因為生產者生產完消息之后就結束了。

 

那么這個時候就使用到了backend這個參數了:

當我們像指明broker一樣指明了backend之后,當worker運行完結果之后就會把他返回給生產者的唯一id作為鍵,將結果作為值傳遞給你設置的worker(backend測試環境為redis).這個時候你無論在什么時候只要拿着這個值去redis里面查找到結果就行了。比如你在執行一個任務,你可以設置一個周期性輪詢,去查看這個結果是否已經被生產出來,如果生產出來便取到該值做相應的操作即可。下面為具體操作實例:

 

先貼出一個我統一使用的配置文件

# file_name=init_celery.py
#
coding: utf-8 from celery import Celery BROKER_URL = 'redis://:password@localhost:6379/0' BACKEND_URL = 'redis://:password@localhost:6379/1' # Add tasks here CELERY_IMPORTS = ( 'try', ) celery = Celery('celery', broker=BROKER_URL, backend=BACKEND_URL, include=CELERY_IMPORTS,) celery.conf.update( CELERY_ACKS_LATE=True, CELERY_ACCEPT_CONTENT=['pickle', 'json'], CELERYD_FORCE_EXECV=True, CELERYD_MAX_TASKS_PER_CHILD=500, BROKER_HEARTBEAT=0, )

 

然后是celery任何函數

from init_celery import celery

@celery.task
def add(x, y):
    result = x + y
    print result
    return result

 

最后我們調用這個函數執行異步操作

from tasks import add

def notify(a, b):
    result = add.apply_async((a, b), queue='laplace')
    return result

if __name__ == '__main__':
    haha = notify(6, 7)
    print haha.status
    print haha.id
    print haha.get(timeout=1)

這里注意我使用了task.apply_async這個函數。其實效果根task.delay差不多只是可以指定更多的參數。 這里我指定了使用的隊列為laplace隊列,所以對應的監聽的隊列也必須使用參數-Q監聽對應的隊列,才能獲得結果。

這里返回的result其實就是一個唯一的<class 'celery.result.AsyncResult'>對象。

我們可以對這個對象查看他的狀態,id 以及使用get去得到他的具體值。

 

合理的使用這個get方法就可以取得woker處理之后的值,同時在redis可以清楚的看到,worker處理之后自動幫你在redis里面存儲了

 

這么介紹之后對於backend是不是已經一目了然了呢?其實我在進行之類操作的時候還踩了一些配置文件方面的坑,

所以一定要仔細的參考官方文檔進行操作。和查看更多你需要用到的功能和參數。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM