一、安裝
由於celery4.0不支持window,如果在window上安裝celery4.0將會出現下面的錯誤flask_clery
你現在只能安裝
pip install celery==3.1
二、安裝py for redis 模塊
pip install redis
三、安裝redis服務
網上很多文章都寫得模棱兩可,把人坑的不要不要的!!!
Redis對於Linux是官方支持的,但是不支持window,網上很多作者寫文章都不寫具體的系統環境,大多數直接說pip install redis
就可以使用redis了,真的是坑人的玩意,本人深受其毒害
對於windows,如果你需要使用redis服務,那么進入該地址下載
https://github.com/MSOpenTech/redis/releases
redis安裝包,雙擊完成就可以了
如果你在window上不安裝該redis包,將會提示redis.exceptions.ConnectionError: Error 10061 connecting to localhost:6379.
或者redis.exceptions.ConnectionError
需要注意是:安裝目錄不能安裝在C盤,否則會出現權限依賴錯誤
四、添加redis環境變量
D:\Program Files\Redis
五、初始化redis
進入redis安裝目錄,打開cmd運行命令redis-server.exe redis.windows.conf
,如果出錯
- 雙擊目錄下的
redis-cli.exe
- 在出現的窗口中輸入
shutdown
- 繼續輸入
exit
六、lask 集成celyer
在Flask配置中添加配置
1 |
# Celery 配置 |
- 在flask工程的
__init__
目錄下生產celery實例
注意!!以下代碼必須在 flask app讀取完配置文件后編寫,否則會報錯
1 |
def make_celery(app): |
完整示例如下
1 |
app = Flask(__name__) |
一份比較常用的配置文件
# 注意,celery4版本后,CELERY_BROKER_URL改為BROKER_URL BROKER_URL = 'amqp://username:passwd@host:port/虛擬主機名' # 指定結果的接受地址 CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db' # 指定任務序列化方式 CELERY_TASK_SERIALIZER = 'msgpack' # 指定結果序列化方式 CELERY_RESULT_SERIALIZER = 'msgpack' # 任務過期時間,celery任務執行結果的超時時間 CELERY_TASK_RESULT_EXPIRES = 60 * 20 # 指定任務接受的序列化類型. CELERY_ACCEPT_CONTENT = ["msgpack"] # 任務發送完成是否需要確認,這一項對性能有一點影響 CELERY_ACKS_LATE = True # 壓縮方案選擇,可以是zlib, bzip2,默認是發送沒有壓縮的數據 CELERY_MESSAGE_COMPRESSION = 'zlib' # 規定完成任務的時間 CELERYD_TASK_TIME_LIMIT = 5 # 在5s內完成任務,否則執行該任務的worker將被殺死,任務移交給父進程 # celery worker的並發數,默認是服務器的內核數目,也是命令行-c參數指定的數目 CELERYD_CONCURRENCY = 4 # celery worker 每次去rabbitmq預取任務的數量 CELERYD_PREFETCH_MULTIPLIER = 4 # 每個worker執行了多少任務就會死掉,默認是無限的 CELERYD_MAX_TASKS_PER_CHILD = 40 # 設置默認的隊列名稱,如果一個消息不符合其他的隊列就會放在默認隊列里面,如果什么都不設置的話,數據都會發送到默認的隊列中 CELERY_DEFAULT_QUEUE = "default" # 設置詳細的隊列 CELERY_QUEUES = { "default": { # 這是上面指定的默認隊列 "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列 "routing_key": "topic.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "task_eeg": { # 設置扇形交換機 "exchange": "tasks", "exchange_type": "fanout", "binding_key": "tasks", }, }
在cmd中啟動celery服務
執行命令celery -A your_application.celery worker loglevel=info,your_application為你工程的名字,在這里為 get_tieba_film
調用
1 |
@app.route('/') |
綁定
一個綁定任務意味着任務函數的第一個參數總是任務實例本身(self),就像 Python 綁定方法類似:
1 |
|
任務繼承
任務裝飾器的 base 參數可以聲明任務的基類
1 |
import celery |
任務名稱
每個任務必須有不同的名稱。
如果沒有顯示提供名稱,任務裝飾器將會自動產生一個,產生的名稱會基於這些信息:
1)任務定義所在的模塊,
2)任務函數的名稱
顯示設置任務名稱的例子:
1 |
>>> @app.task(name='sum-of-two-numbers') |
最佳實踐是使用模塊名稱作為命名空間,這樣的話如果有一個同名任務函數定義在其他模塊也不會產生沖突。
1 |
>>> @app.task(name='tasks.add') |
七、安裝flower
將各個任務的執行情況、各個worker的健康狀態進行監控並以可視化的方式展現
1 |
pip install flower |
啟動flower(默認會啟動一個webserver,端口為5555):
1 |
指定broker並啟動: celery flower --broker=amqp://guest:guest@localhost:5672// 或
|
八、常見錯誤
1 |
ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379/0: |
原因是:redis-server 沒有啟動
解決方案:到redis安裝目錄下執行redis-server.exe redis.windows.conf
檢查redis是否啟動:redis-cli ping
1 |
line 442, in on_task_received |
解決:
1 |
Did you remember to import the module containing this task? |
原因:任務沒有注冊或注冊不成功,只有在啟動的時候提示有任務的時候,才能使用該任務flask_celery
解決:
- 你在那個類中使用celery就在哪個類中執行
celery -A 包名.類名.celery worker -l info
- 根據上一部提示的任務列表給任務設置對應的名稱
如在Test中
1 |
from main import app, celery |
目錄結構:
1 |
+ Card # 工程 |
則應該啟動的命令為:
1 |
celery -A main.Test.celery worker -l info |
同時,如果你的Task.py也有任務,那么你還應該重新創建一個cmd窗口執行
1 |
celery -A main.admin.Task.celery worker -l info |
celery的工作進程可以創建多個flask_celery
flask_celery
參考:
https://www.laoyuyu.me/2018/02/10/python_flask_celery/
https://www.cnblogs.com/cwp-bg/p/8759638.html
celery用戶指南,強烈推薦看
redis安裝
celery使用
https://redis.io/topics/quickstart
http://einverne.github.io/post/2017/05/celery-best-practice.html Celery 最佳實踐
http://orangleliu.info/2014/08/09/celery-best-practice/ Celery最佳實踐-正確使用celery的7條建議
https://www.jianshu.com/p/cc3a0ffb9c76
https://windard.com/opinion/2017/03/18/Task-Queue-Celery 使用 Celery 和 redis 完成任務隊列