這個翻譯之后居然叫芹菜~~最近Django框架需要涉及到執行周期任務~~上網搜了下其實還挺多的(django_crontab:這個學習周期短,但是發現不僅麻煩還不好用啊)、(apscheduler,簡單還行在沒完全掌握Celery時先用它頂了一段時間,但是需要注意不同版本的使用方法差別還挺大的),最后還是決定花時間來學學,Celery是Python語言實現的分布式隊列服務,除了支持即時任務,還支持定時任務。
Celery中的五個核心角色
- Task
任務(Task)就是你要做的事情,例如一個注冊流程里面有很多任務,給用戶發驗證郵件就是一個任務,這種耗時任務可以交給Celery去處理,還有一種任務是定時任務,比如每天定時統計網站的注冊人數,這個也可以交給Celery周期性的處理。
- Broker
Broker 的中文意思是經紀人,指為市場上買賣雙方提供中介服務的人。在Celery中它介於生產者和消費者之間經紀人,這個角色相當於數據結構中的隊列。例如一個Web系統中,生產者是處理核心業務的Web程序,業務中可能會產生一些耗時的任務,比如短信,生產者會將任務發送給 Broker,就是把這個任務暫時放到隊列中,等待消費者來處理。消費者是 Worker,是專門用於執行任務的后台服務。Worker 將實時監控隊列中是否有新的任務,如果有就拿出來進行處理。Celery 本身不提供隊列服務,一般用 Redis 或者 RabbitMQ 來扮演 Broker 的角色
- Worker
Worker 就是那個一直在后台執行任務的人,也稱為任務的消費者,它會實時地監控隊列中有沒有任務,如果有就立即取出來執行。
- Beat
Beat 是一個定時任務調度器,它會根據配置定時將任務發送給 Broker,等待 Worker 來消費。
- Backend
Backend 用於保存任務的執行結果,每個任務都有返回值,比如發送郵件的服務會告訴我們有沒有發送成功,這個結果就是存在Backend中,當然我們並不總是要關心任務的執行結果。
入門操作
1、安裝(Celery4.x 開始不再支持Windows平台,如果需要在Windows開發,請使用3.x的版本)
pip install celery==3.1.15
2、創建一個實例
# 創建一個Celery實例,broker是管道用於儲存任務,官方推薦Redis、RabbitMQ;backend用於存儲任務執行結果 app = Celery("tasks", broker="redis://127.0.0.1:6379", backend='redis://127.0.0.1:6379')
3、直接上代碼(新建一個celery_task1目錄,里面建下面兩個py文件)

#!/usr/bin/env python3 # -*- encoding:utf-8 -*- """ author:Barret mail:barret_vip@163.com """ from celery import Celery import time # 創建Celery實例 app = Celery('tasks', broker='redis://127.0.0.1:6379', backend='redis://127.0.0.1:6379', ) app.conf.task_protocol = 1 # 創建任務 @app.task def add(x): time.sleep(1) print("開始任務了:%s" %x) return x

#!/usr/bin/env python3 # -*- encoding:utf-8 -*- """ author:Barret mail:barret_vip@163.com """ from celery.result import AsyncResult import sys dir = r"D:\today\celery_task1" sys.path.append(dir) # 我的任務文件不在環境變量里,IDLE找不到 from celerys import add reslut = add.delay(1) # 判斷是否有值,get調用會阻塞 print(reslut) # 可以查看id,也可以使用reslut.id獲取id。 # 異步獲取任務返回值 async_task = AsyncResult(id="45cf4a22-82d5-43f2-a0c9-8fd3af6b1136", app=add) # 判斷異步任務是否執行成功 if async_task.successful(): r = reslut.get(propagate=False) # 出現異常返回異常不觸發異常 print(r) else: print("任務還沒執行完畢")
4、執行步驟(不建議直接在Django項目中創建,會出現報錯)
# 1、打開CMD或者Pycharm下的Terminal執行下面命令: celery -A celerys worker -l info -P eventlet # 2、接着運行cmd.py即可打印結果
結果出現這樣的即運行成功:
入門操作報錯
- BUG1:如果出現:AttributeError: 'str' object has no attribute 'items',則是redis版本過高(我安裝的是3.x),安裝2.10.6即可~(但是在我安裝2.10之后又說我版本太低,但是當我重新裝回3.x的時候又能用了~~~我只能說玄學好吧)
- BUG_2:如果出現下面這個報錯解決辦法是,在實例化Celery時添加下面配置項,這個好像是新協議問題,使用指定到舊協議。

# 在app創建的時候指定 CELERY_TASK_PROTOCOL = 1 # Django中使用這個 app.conf.task_protocol = 1
- BUG_3:如果想獲取返回結果直接用get方式會報錯,需要指定保存任務結果的位置。
reslut.get() # 會報錯這里是實例化的時候,沒有定義backend,就是保存任務結果的位置。
- BUG_3解決:指定輸出到,我這里還是指定到redis中。
好吧我算是把各種報錯都玩了個遍~~~~
- BUG_4:在執行時又出現下面這個報錯了
Traceback (most recent call last): File "d:\programmingsoftware\python35\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "d:\programmingsoftware\python35\lib\site-packages\celery\app\trace.py", line 525, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0)

pip install eventlet # 安裝一下 celery -A task1 worker -l info -P eventlet # 將啟動命令改成這個 再去執行cmd.py就沒問題了~~
- 哇~~~~想哭一波三折終於看到輸出了(這一步就代表初步使用沒問題了,接下來我終於可以做周期任務了)
在Django中使用Celery
更新中..................