Celery實現周期任務


  這個翻譯之后居然叫芹菜~~最近Django框架需要涉及到執行周期任務~~上網搜了下其實還挺多的(django_crontab:這個學習周期短,但是發現不僅麻煩還不好用啊)、(apscheduler,簡單還行在沒完全掌握Celery時先用它頂了一段時間,但是需要注意不同版本的使用方法差別還挺大的),最后還是決定花時間來學學,Celery是Python語言實現的分布式隊列服務,除了支持即時任務,還支持定時任務。

Celery中的五個核心角色

  • Task

  任務(Task)就是你要做的事情,例如一個注冊流程里面有很多任務,給用戶發驗證郵件就是一個任務,這種耗時任務可以交給Celery去處理,還有一種任務是定時任務,比如每天定時統計網站的注冊人數,這個也可以交給Celery周期性的處理。

  • Broker

  Broker 的中文意思是經紀人,指為市場上買賣雙方提供中介服務的人。在Celery中它介於生產者和消費者之間經紀人,這個角色相當於數據結構中的隊列。例如一個Web系統中,生產者是處理核心業務的Web程序,業務中可能會產生一些耗時的任務,比如短信,生產者會將任務發送給 Broker,就是把這個任務暫時放到隊列中,等待消費者來處理。消費者是 Worker,是專門用於執行任務的后台服務。Worker 將實時監控隊列中是否有新的任務,如果有就拿出來進行處理。Celery 本身不提供隊列服務,一般用 Redis 或者 RabbitMQ 來扮演 Broker 的角色

  • Worker

  Worker 就是那個一直在后台執行任務的人,也稱為任務的消費者,它會實時地監控隊列中有沒有任務,如果有就立即取出來執行。

  • Beat

  Beat 是一個定時任務調度器,它會根據配置定時將任務發送給 Broker,等待 Worker 來消費。

  • Backend

  Backend 用於保存任務的執行結果,每個任務都有返回值,比如發送郵件的服務會告訴我們有沒有發送成功,這個結果就是存在Backend中,當然我們並不總是要關心任務的執行結果。

                                                 

入門操作

1、安裝(Celery4.x 開始不再支持Windows平台,如果需要在Windows開發,請使用3.x的版本)

pip install celery==3.1.15

2、創建一個實例

# 創建一個Celery實例,broker是管道用於儲存任務,官方推薦Redis、RabbitMQ;backend用於存儲任務執行結果
app = Celery("tasks", broker="redis://127.0.0.1:6379", backend='redis://127.0.0.1:6379')

 3、直接上代碼(新建一個celery_task1目錄,里面建下面兩個py文件)

#!/usr/bin/env python3
# -*- encoding:utf-8 -*-
"""
author:Barret
mail:barret_vip@163.com
"""
from celery import Celery
import time

# 創建Celery實例
app = Celery('tasks', broker='redis://127.0.0.1:6379',
                      backend='redis://127.0.0.1:6379',
             )
app.conf.task_protocol = 1

# 創建任務
@app.task
def add(x):
    time.sleep(1)
    print("開始任務了:%s" %x)
    return x
celerys.py
#!/usr/bin/env python3
# -*- encoding:utf-8 -*-
"""
author:Barret
mail:barret_vip@163.com
"""
from celery.result import AsyncResult
import sys

dir = r"D:\today\celery_task1"
sys.path.append(dir)  # 我的任務文件不在環境變量里,IDLE找不到
from celerys import add
reslut = add.delay(1)

# 判斷是否有值,get調用會阻塞
print(reslut) # 可以查看id,也可以使用reslut.id獲取id。

# 異步獲取任務返回值
async_task = AsyncResult(id="45cf4a22-82d5-43f2-a0c9-8fd3af6b1136", app=add)

# 判斷異步任務是否執行成功
if async_task.successful():
    r = reslut.get(propagate=False)  # 出現異常返回異常不觸發異常
    print(r)
else:
    print("任務還沒執行完畢")
cmd

4、執行步驟(不建議直接在Django項目中創建,會出現報錯)

# 1、打開CMD或者Pycharm下的Terminal執行下面命令:
celery -A celerys  worker -l info -P eventlet

# 2、接着運行cmd.py即可打印結果

 結果出現這樣的即運行成功:

入門操作報錯

  • BUG1:如果出現:AttributeError: 'str' object has no attribute 'items',則是redis版本過高(我安裝的是3.x),安裝2.10.6即可~(但是在我安裝2.10之后又說我版本太低,但是當我重新裝回3.x的時候又能用了~~~我只能說玄學好吧)

  •  BUG_2:如果出現下面這個報錯解決辦法是,在實例化Celery時添加下面配置項,這個好像是新協議問題,使用指定到舊協議。

# 在app創建的時候指定
CELERY_TASK_PROTOCOL = 1  # Django中使用這個
app.conf.task_protocol = 1
BUG_2:解決辦法
  • BUG_3:如果想獲取返回結果直接用get方式會報錯,需要指定保存任務結果的位置。
reslut.get()  # 會報錯這里是實例化的時候,沒有定義backend,就是保存任務結果的位置。

  • BUG_3解決:指定輸出到,我這里還是指定到redis中。

好吧我算是把各種報錯都玩了個遍~~~~

  • BUG_4:在執行時又出現下面這個報錯了
Traceback (most recent call last):
  File "d:\programmingsoftware\python35\lib\site-packages\billiard\pool.py", line 358, in workloop
    result = (True, prepare_result(fun(*args, **kwargs)))
  File "d:\programmingsoftware\python35\lib\site-packages\celery\app\trace.py", line 525, in _fast_trace_task
    tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
pip install eventlet # 安裝一下
celery -A task1 worker -l info -P eventlet # 將啟動命令改成這個
再去執行cmd.py就沒問題了~~
BUG_4:解決方法
  • 哇~~~~想哭一波三折終於看到輸出了(這一步就代表初步使用沒問題了,接下來我終於可以做周期任務了)

在Django中使用Celery

更新中..................

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM