celery
是啥?
由python 編寫 的異步生產者消費者設計模式下 的實例
舉個例子:
現有兩個進程 生產者進程A 消費者進程B
現在的情況是
邏輯推導:
A 產出栗子 B 要吃栗子 那么這兩個進程必然是 B依賴於A 耦合度很高且是一個耗時操作
B -----> (發送請求給A)------->(等待A 產出栗子也許會很久)------->(A響應栗子給B)------->(B得到栗子)
B 可能是個很多服務的集成后台之類很忙大忙人不想一直等等等
那么 celery 的任務就是 替B 去等
邏輯推導:
A 產出栗子 B 要吃栗子 C celery
B (替我去取栗子)-----> C(發送請求給A)------->(等待A 產出栗子也許會很久)------->(A響應栗子給C)------->(B得到栗子) (C 可以去把栗子存在一個地方B直接去取就好了)
那么celery 的本質知道了:一個中間人角色 ,類似快遞小哥,跑腿的
作用:
1 防止線程阻塞提高性能
2低耦合解耦 高內聚 高復用
好處 ::每分鍾可以實現數以百萬的任務
特點: 使用消息隊列(broker )在客戶端和消費者之間協調
主體在消息隊列 還有 協調 兩端可以有多個
好的,現在說celery 的結構
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件(message broker): 這個很好理解 ,B指派給C 的任務可能不只一件,所以celery 要把任務存起來一件一件去執行 ,存儲一般是用消息隊列結構的,celery 本身是不帶存儲空間的需要指派位置存 包括,RabbitMQ,Redis,MongoDB等
ps : 隊列也是一種數據結構 , 表現形式為一段進入,一段出 先進先出, 不能再中間插入 ,類似管道
任務執行單元(worker): Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中 (快遞小哥本體)
任務執行結果存儲(task result store):小哥把東西要放到快遞櫃咯 這里也要用到其他服務 包括Redis,MongoDB,Django ORM,AMQP等
簡單來說就是 broker 收集任務(收件)worker 執行任務 存在 task result store (派送)
這里主要使用redis
celery 簡單配置
# 安裝celery sudo pip install celery -i https://pypi.douban.com/simple ------配置信息 tasks.py------- BROKER_URL = 'redis://localhost:6379/0' # 格式 redis://:password@hostname:port/db_number 不過redis 一般不設密碼 from celery import Celery app = Celery("tasks", broker=BROKER_URL) @app.task def add(x,y): return x,y ----------啟動------ $ celery -A tasks worker --loglevel=info
#查詢文檔,了解到該命令中-A參數表示的是Celery APP的名稱,這個實例中指的就是tasks.py,后面的tasks就是APP的名稱,worker是一個執行任務角色,
后面的loglevel=info記錄日志類型默認是info,這個命令啟動了一個worker,用來執行程序中add這個加法任務(task)。
啟動成功效果展示
celery 配合 django 使用
項目結構
項目結構
1安裝
pip install -U celery
2.1定義 celery_tasks包
2.2 創建celery 實例 啟動文件
main.py ------------ # celery啟動文件(獨立於項目的啟動文件單獨啟動) from celery import Celery # # 創建celery實例 celery_app = Celery('abc') # 加載配置 celery_app.config_from_object('celery_tasks.config')
2.3 加載 celery配置 (celery_tasks.config)
# 指定消息隊列的位置redis上存放隊列 broker_url = 'redis://192.168.103.210/7'
解耦出來的業務 可以放在celery 的子任務夾里面
注冊任務
-----回到main 文件------- .......... ....... # 自動注冊celery任務 celery_app.autodiscover_tasks(['celery_tasks.sms'])
定義任務
案例: 容聯雲通信接口調用
# 加上裝飾器代表載入進去 name:異步任務別名 @celery_app.task(name='ccp_send_sms_code') def ccp_send_sms_code(self, mobile, sms_code): """ 發送短信異步任務 :param mobile: 手機號 :param sms_code: 短信驗證碼 :return: 成功0 或 失敗-1 """ send_ret = CCP().send_template_sms(mobile, [sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID) return send_ret
啟動clerery
$ celery -A celery_tasks.main worker -l info
tornado + celery
因為tornado web框架 部分基本仿照了django 所以celery 使用和 django 也是類似的
結構·
-------config 文件-------------- # celery BROKER_URL = 'redis://127.0.0.1:6379/2' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2' CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_TASK_SERIALIZER = 'pickle' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 CELERY_ACCEPT_CONTENT = ['json', 'pickle'] CELERY_IMPORTS = ( 'celery_tasks.remove_files.tasks', )
-----main -------- from celery import Celery from . import config import os app = Celery('task') app.config_from_object(config)
啟動clerery
$ celery -A celery_tasks.main worker -l info
現在celery 服務全部啟用了 只需要在你的項目里面導包調用他就好了
# 解耦處調用該方法時 delay 是celry 自帶的方案可以監聽任務的狀態 ccp_send_sms_code.delay(**kwars)