Celery


1.什么是Celery?

Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統

專注於實時處理的異步任務隊列,同時也支持任務調度

2.Celery架構

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

 

消息中間件

Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等

任務執行單元

Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等

執行流程:

user相當於提交任務的人,提交給broker也就是消息中間件,worker相當於工人,broker里面有了用戶也就是程序提交的任務,worker就去取出來執行類似於生產者消費者模型,store說簡單點就是worker執行結束后的返回結果

 

版本支持情況

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

 

 注:Celery不支持windows但並不是不可以使用可以用第三方模塊來完成,但是windows上使用出了問題官方不會提供幫助

3.Celery的安裝配置

pip install celery

消息中間件:RabbitMQ/Redis

app=Celery('任務名',backend='xxx',broker='xxx')

4.測試案例

 

# 先導入
from celery import Celery

# 下面是配置信息,這里使用redis為列
# broker='redis://127.0.0.1:6379/2' 不加密碼
# 消息中間件
backend = 'redis://:lmdxxx@139.196.**.**:6380/7'
# 處理結果
broker = 'redis://:lmdxxx@139.196.**.**:6380/8'
# 注:redis有密碼的情況下前面加@輸入密碼即可,最后面的是指定儲存在redis的那個庫中
# 實列話產生一個對celery象,一個項目中可能會用到多個Celery # 第一個參數是當前任務的名字,一定要寫 APP = Celery('test', broker=broker, backend=backend)

 

1.首先沒得說了肯定要先導入celery,上文說了selery有消息中間件,處理者,結果存儲,這里使用redis來作為測試

2.配置消息中間件,配置結果儲存位置

APP = Celery('test', broker=broker, backend=backend)
# 實列話產生一個對celery象,一個項目中可能會用到多個Celery,所以在第一個參數中傳入指定的名稱,不可以重復
# 對象名稱無所謂

3.創建一個selery任務

# 任務其實就是一個函數
# 需要用一個裝飾器去裝飾才能說明這是一個被celery管理的任務,且可以用celery執行
# 裝飾器實際就是實列化出來的那個對象,里面的一個固定方法
@APP.task
def add(x, y):
    import time
    time.sleep(2)
    return x + y

 4.創建一個用於提交任務的

正常同步提交任務

提交任務不執行,有worker才會去執行

注:得到的這個ID其實我們工作中可以把它set到cookie中,用戶就可以通過輪詢去查詢redis數據庫中去尋找結果

也可以直接return返回給前端,給前端處理

 

返回的對應的是存放redis中間件里面的提交任務屬性的ID用於查詢返回結果,看不懂沒關系

5.任務提交任務后需要創建工人執行任務

創建py文件:run.py,執行任務,或者使用命令執行(win不可以使用):celery worker -A celery_task_cs -l info (celery_test_cs是創建任務的那個名字 -l info是打印的日志級別)

windows下:celery worker -A celery_test_cs -l info -P eventlet

win安裝:pip install eventlet

代碼執行通常不用:

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

提交任務的時候注意導入方式,有時候導入方式問題會產生報錯

啟動后如下:

收到任務后分配任務切返回執行信息 7 就是我們的執行結果 前面的就是執行的時間 這個是info級別的日志打印的

 

redis 存放結果數據如下,執行狀態 結果 之類都在里面

6.查看結果

 

流程梳理:

celery的使用
1.先安裝 pip install celery
2.寫一個py文件:celery_task
3.指定broker(消息中間件),指定backend(結果存儲)
4.實例化產生一個Celery對象 app=Celery('名字',broker,backend)
5.加裝飾器綁定任務,在函數(add)上加裝飾器app.task
6.其他程序提交任務,先導入add,add.delay(參數,參數),會將該函數提交到消息中間件,但是並不會執行,有個返回值,直接print會打印出任務的id,以后用id去查詢任務是否執行完成
7.啟動worker去執行任務:
linux:celery worker -A 創建的任務的那個py文件 -l info
windows下:celery worker 創建任務的那個py文件 -A -l info -P eventlet
8.查看結果:根據id去查詢

 

應用場景

異步任務:將耗時操作任務提交給Celery去異步執行,比如生成圖表,發送短信/郵件、消息推送、音視頻處理等等

定時任務:定時執行某件事情,比如每天數據統計

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM