基於 Celery 的后台任務¶
如果應用有一個長時間運行的任務,如處理上傳數據或者發送電子郵件,而你不想在 請求中等待任務結束,那么可以使用任務隊列發送必須的數據給另一個進程。這樣就 可以在后台運行任務,立即返回請求。
Celery 是強大的任務隊列庫,它可以用於簡單的后台任務,也可用於復雜的多階段 應用的計划。本文主要說明如何在 Flask 中配置使用 Celery 。本文假設你 已經閱讀過了其官方文檔中的 Celery 入門
安裝
Celery 是一個獨立的 Python 包。使用 pip 從 PyPI 安裝:
$ pip install celery
配置
你首先需要有一個 Celery 實例,這個實例稱為 celery 應用。其地位就相當於 Flask 中 Flask
一樣。這個實例被用作所有 Celery 相關事務的 入口,如創建任務和管理工人,因此它必須可以被其他模塊導入。
例如,你可以把它放在一個 tasks
模塊中。這樣不需要重新配置,你就可以使用 tasks 的子類,增加 Flask 應用情境的支持,並鈎接 Flask 的配置。
只要如下這樣就可以在 Falsk 中使用 Celery 了:
from celery import Celery def make_celery(app): celery = Celery( app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery
這個函數創建了一個新的 Celery 對象,使用了應用配置中的 broker ,並從 Flask 配置中更新了 Celery 的其余配置。然后創建了一個任務子類,在一個應用情境中包 裝了任務執行。
一個示例任務
讓我們來寫一個任務,該任務把兩個數字相加並返回結果。我們配置 Celery 的 broker ,后端使用 Redis 。使用上文的工廠創建一個 celery
應用,並用它定 義任務。:
from flask import Flask flask_app = Flask(__name__) flask_app.config.update( CELERY_BROKER_URL='redis://localhost:6379', CELERY_RESULT_BACKEND='redis://localhost:6379' ) celery = make_celery(flask_app) @celery.task() def add_together(a, b): return a + b
這個任務現在可以在后台調用了:
result = add_together.delay(23, 42) result.wait() # 65
運行 Celery 工人
至此,如果你已經按上文一步一步執行,你會失望地發現你的 .wait()
不會真正 返回。這是因為還需要運行一個 Celery 工人來接收和執行任務。:
$ celery -A your_application.celery worker
把 your_application
字符串替換為你創建 celery 對像的應用包或模塊。
現在工人已經在運行中,一旦任務結束, wait
就會返回結果