celery 使用詳解


celery

是啥?

由python 編寫 的異步生產者消費者設計模式下 的實例

舉個例子:

現有兩個進程 生產者進程A 消費者進程B

現在的情況是

邏輯推導:

A 產出栗子 B 要吃栗子 那么這兩個進程必然是 B依賴於A 耦合度很高且是一個耗時操作

 

B -----> (發送請求給A)------->(等待A 產出栗子也許會很久)------->(A響應栗子給B)------->(B得到栗子)

 

B 可能是個很多服務的集成后台之類很忙大忙人不想一直等等等

 

那么 celery 的任務就是 替B 去等

邏輯推導:

A 產出栗子 B 要吃栗子 C celery

B (替我去取栗子)-----> C(發送請求給A)------->(等待A 產出栗子也許會很久)------->(A響應栗子給C)------->(B得到栗子) (C 可以去把栗子存在一個地方B直接去取就好了)

 

那么celery 的本質知道了:一個中間人角色 ,類似快遞小哥,跑腿的

 

作用:

1 防止線程阻塞提高性能

2低耦合解耦 高內聚 高復用

好處 ::每分鍾可以實現數以百萬的任務

特點: 使用消息隊列(broker )在客戶端和消費者之間協調

主體在消息隊列 還有 協調 兩端可以有多個

 

 

好的,現在說celery 的結構

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

消息中間件(message broker): 這個很好理解 ,B指派給C 的任務可能不只一件,所以celery 要把任務存起來一件一件去執行 ,存儲一般是用消息隊列結構的,celery 本身是不帶存儲空間的需要指派位置存 包括,RabbitMQ,Redis,MongoDB

ps : 隊列也是一種數據結構 , 表現形式為一段進入,一段出 先進先出, 不能再中間插入 ,類似管道

 

任務執行單元(worker): Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中 (快遞小哥本體)

 

任務執行結果存儲(task result store):小哥把東西要放到快遞櫃咯 這里也要用到其他服務 包括Redis,MongoDB,Django ORM,AMQP等

 

簡單來說就是 broker 收集任務(收件)worker 執行任務 存在 task result store (派送)

 

這里主要使用redis

celery 簡單配置

 

# 安裝celery
sudo pip install celery -i https://pypi.douban.com/simple
------配置信息  tasks.py-------
BROKER_URL = 'redis://localhost:6379/0'
# 格式 redis://:password@hostname:port/db_number 不過redis 一般不設密碼

from celery import Celery
app = Celery("tasks", broker=BROKER_URL)

@app.task
def add(x,y):
    return x,y

----------啟動------
$ celery -A tasks worker --loglevel=info

#查詢文檔,了解到該命令中-A參數表示的是Celery APP的名稱,這個實例中指的就是tasks.py,后面的tasks就是APP的名稱,worker是一個執行任務角色,
后面的loglevel=info記錄日志類型默認是info,這個命令啟動了一個worker,用來執行程序中add這個加法任務(task)。

啟動成功效果展示

 

celery 配合 django 使用

項目結構

 

項目結構

1安裝

pip install -U celery

 

2創建Celery 實例加載配置

2.1定義 celery_tasks包

2.2 創建celery 實例 啟動文件

 

main.py
------------
# celery啟動文件(獨立於項目的啟動文件單獨啟動)
from celery import Celery # 


# 創建celery實例
celery_app = Celery('abc')
# 加載配置
celery_app.config_from_object('celery_tasks.config')

 

 2.3 加載 celery配置 (celery_tasks.config)

# 指定消息隊列的位置redis上存放隊列
broker_url = 'redis://192.168.103.210/7'

解耦出來的業務 可以放在celery 的子任務夾里面

注冊任務

-----回到main 文件-------
..........
.......
# 自動注冊celery任務
celery_app.autodiscover_tasks(['celery_tasks.sms'])

 

定義任務

案例: 容聯雲通信接口調用

# 加上裝飾器代表載入進去   name:異步任務別名 
@celery_app.task(name='ccp_send_sms_code')
def ccp_send_sms_code(self, mobile, sms_code):
    """
    發送短信異步任務
    :param mobile: 手機號
    :param sms_code: 短信驗證碼
    :return: 成功0 或 失敗-1
    """

    send_ret = CCP().send_template_sms(mobile, [sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID)

    return send_ret

啟動clerery

$ celery -A celery_tasks.main worker -l info

tornado + celery

因為tornado web框架 部分基本仿照了django 所以celery 使用和 django 也是類似的

 結構·

 

 

-------config 文件--------------
# celery
BROKER_URL = 'redis://127.0.0.1:6379/2'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ['json', 'pickle']

CELERY_IMPORTS = (
    'celery_tasks.remove_files.tasks',

)
-----main --------
from celery import Celery
from . import config

import os


app = Celery('task')
app.config_from_object(config)

 

啟動clerery

$ celery -A celery_tasks.main worker -l info

現在celery 服務全部啟用了 只需要在你的項目里面導包調用他就好了

# 解耦處調用該方法時 delay 是celry 自帶的方案可以監聽任務的狀態
ccp_send_sms_code.delay(**kwars)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM