前言
簡單介紹一下celery:Celery 是一個異步任務隊列。你可以使用它在你的應用上下文之外執行任務。總的想法就是你的應用程序可能需要執行任何消耗資源的任務都可以交給任務隊列,讓你的應用程序自由和快速地響應客戶端請求。
官方文檔:https://docs.celeryproject.org/en/stable/
中文文檔:https://www.celerycn.io/ru-men/celery-jian-jie
Flask 中使用 Celery:http://www.pythondoc.com/flask-celery/first.html
轉發別人寫的,大家可以參考着來:http://www.mamicode.com/info-detail-2309241.html
安裝
因為以下操作在flask中執行,根據網上說的,celery最新版不支持windows系統,所以我這邊安裝的是v3.0版本,最新版還未嘗試,感興趣的同學可以自行操作一番。
中間人采用的是redis,這里選擇低版本的redis安裝使用,不然會報一個不兼容的錯誤。
# 推薦使用國內鏡像源安裝
pip install flask
pip install celery==3.1.25
pip install redis==2.10.6
首次體驗
當安裝上面的包后,我們先在最基礎的flask框架下試一下,能否啟動成功。
代碼實現
這是flask初始化的app.py
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello_world():
return 'Hello World!'
if __name__ == '__main__':
app.run(debug=True)
配置后的app.py
import time
from flask import Flask
from celery import Celery
app = Flask(__name__)
# 這里是配置中間人redis基礎寫法,可以照抄。如果是服務器,需要更換(127.0.0.1)
app.config['CELERY_BROKER_URL'] = 'redis://127.0.0.1:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://127.0.0.1:6379/0'
# 初始化celery
celery_ = Celery(app.name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'])
celery_.conf.update(app.config)
@app.route('/')
def hello_world():
return 'Hello World!'
# 這里定義后台任務,異步執行裝飾器@celery_.task
@celery_.task
def my_background_task(arg1):
# some long running task here
time.sleep(5)
result = arg1
print(result.id)
return result
# 以下是兩個視圖,測試celery能否正常使用
@app.route("/index")
def index():
"""一個測試的實例"""
# print(my_background_task(3)) # add函數也能做普通的函數使用
result = my_background_task.apply_async(args=[5, ]) # 發送異步任務,指定隊列
return result.id
@app.route("/index/<result_id>")
def get_result(result_id):
# 根據任務ID獲取任務結果
print(result_id)
# 這里請注意,網上很多教程只有“AsyncResult”,但這是不對的
# 這里要調用的是實例化后的celery的對象方法才行,如果不是這樣,啟動celery服務后會報一個配置后端錯誤
result = celery_.AsyncResult(id=result_id)
print(result)
return str(result.get())
if __name__ == '__main__':
app.run(debug=True)
啟動服務
我們嘗試在pycharm的窗口下執行命令
注意,一下窗口是另起的一個窗口用來啟動celery服務。
可以通過訪問http://127.0.0.1:5000/index查看
結合flask
先介紹一下我的工程目錄結構,大家根據自己實際情況去設計,畢竟我還是比較菜的,學我可能會誤人子弟,哈哈。
init.py
from celery import Celery
celery = Celery(__name__, broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')
tasks.py
import time
from . import celery
@celery.task()
def add(x, y):
"""
加法
:param x:
:param y:
:return:
"""
time.sleep(10)
return str(x + y)
applocation.py
from flask import Flask
from celery_task import celery
def create_app(config_name):
"""
創建flask應用對象
:param config_name: str 配置模式的模式名字 (“develop”,“product”)
:return:
"""
app = Flask(__name__)
"""
只需要在初始化app中加入這行代碼,將下面的配置信息寫入app的配置文件
app.config['CELERY_BROKER_URL'] = 'redis://127.0.0.1:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://127.0.0.1:6379/0'
"""
# 添加實例化需要在app實例之后
celery.conf.update(app.config)
# .........
return app
以上的方式是為了解決循環導入的問題,如果大家有更好的方案,可以優化一下。
接下來的啟動方式與上面一樣,開兩個窗口,一個啟動flask項目,一個啟動celery服務。
!!!注意!!!