Celery
Celery是一個功能完備即插即用的異步任務隊列系統。它適用於異步處理問題,當發送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操作,我們可將其異步執行,這樣用戶不需要等待很久,提高用戶體驗。
文檔:http://docs.jinkan.org/docs/celery/getting-started/index.html
Celery的特點是:
- 簡單,易於使用和維護,有豐富的文檔。
- 高效,單個celery進程每分鍾可以處理數百萬個任務。
- 靈活,celery中幾乎每個部分都可以自定義擴展。
任務隊列是一種跨線程、跨機器工作的一種機制.
任務隊列中包含稱作任務的工作單元。有專門的工作進程持續不斷的監視任務隊列,並從中獲得新的任務並處理.
celery通過消息進行通信,通常使用一個叫Broker(中間人)協同client(任務的發出者)和worker(任務的處理者). clients發出消息到隊列中,broker將隊列中的信息派發給worker來處理。
Celery的架構
Celery的架構由三部分組成,消息隊列(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
一個celery系統可以包含很多的worker和broker
Celery本身不提供消息隊列功能,但是可以很方便地和第三方提供的消息中間件進行集成,包括RabbitMQ,Redis,MongoDB等
安裝
pip install -U celery
也可從官方直接下載安裝包:https://pypi.python.org/pypi/celery/
tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python setup.py build
python setup.py install
使用
使用celery第一件要做的最為重要的事情是需要先創建一個Celery實例,我們一般叫做celery應用,或者更簡單直接叫做一個app。app應用是我們使用celery所有功能的入口,比如創建任務,管理任務等,在使用celery的時候,app必須能夠被其他的模塊導入。
一般celery任務目錄直接放在項目的根目錄下即可,路徑:
項目根目錄/
├── mycelery/
├── config.py # 配置文件
├── __init__.py
├── main.py # 主程序
└── sms/ # 一個目錄可以放置多個任務,該目錄下存放當前任務執行時需要的模塊或依賴
└── tasks.py # 任務的文件,名稱必須是這個!!!
main.py,代碼:
# 主程序
from celery import Celery
# 創建celery實例對象
app = Celery("test")
# 通過app對象加載配置
app.config_from_object("mycelery.config")
# 自動搜索並加載任務
# 參數必須是一個列表,里面的每一個任務都是任務的路徑名稱
# app.autodiscover_tasks(["任務1","任務2",....])
app.autodiscover_tasks(["mycelery.sms","mycelery.cache"])
# 啟動Celery的命令
# 強烈建議切換目錄到項目的根目錄下啟動celery!!
# celery -A mycelerymain worker --loglevel=info
配置文件config.py,代碼:
# 任務隊列的鏈接地址
broker_url = 'redis://127.0.0.1:6379/15'
# 結果隊列的鏈接地址
result_backend = 'redis://127.0.0.1:6379/14'
創建一個任務文件sms/tasks.py,並創建任務,代碼:
# celery的任務必須寫在tasks.py的文件中,別的文件名稱不識別!!!
from mycelery.main import app
# name表示設置任務的名稱,如果不填寫,則默認使用函數名做為任務名
@app.task(name="send_sms")
def send_sms():
print("發送短信!!!")
@app.task(name="send_sms2")
def send_sms2():
print("發送短信任務2!!!")
接下來,我們運行celery,效果如下:
在程序中調用上面的異步任務,拿django進行舉例:
# 調用celery執行異步任務
from my_celery.sms.tasks import send_sms
send_sms.delay(mobile)
其他參考文檔:
http://docs.celeryproject.org/en/latest/getting-started/introduction.html
https://github.com/celery/celery/tree/master/examples/django/
https://www.jianshu.com/p/1840035cb510
https://flower.readthedocs.io/en/latest/screenshots.html
接下來,我們需要把celery和django組合起來一起使用。
把django和celery進行組合
在main.py主程序中對django的配置文件進行加載
# 主程序
import os
from celery import Celery
# 創建celery實例對象
app = Celery("test")
# 把celery和django進行組合,識別和加載django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')
# 對django框架執行初始化
import django
django.setup()
# 通過app對象加載配置
app.config_from_object("mycelery.config")
# 加載任務
# 參數必須必須是一個列表,里面的每一個任務都是任務的路徑名稱
# app.autodiscover_tasks(["任務1","任務2"])
app.autodiscover_tasks(["mycelery.sms","mycelery.cache"])
# 啟動Celery的命令
# 強烈建議切換目錄到mycelery根目錄下啟動
# celery -A main worker --loglevel=info
在需要使用django配置的任務中,直接加載配置,所以我們把注冊的短信發送功能,整合成一個任務函數,代碼:
from my_celery.main import app
from .yuntongxun.sms import CCP
from myprobject.settings import constants
import logging
log = logging.getLogger("django")
@app.task(name="send_sms")
def send_sms(mobile, sms_code):
"""異步發送短信"""
ccp = CCP()
try:
result = ccp.send_template_sms(mobile, [sms_code, constants.SMS_EXPIRE_TIME//60 ], constants.SMS_TEMPLATE_ID)
return result
except:
log.error("發送短信驗證碼失敗!手機號:%s" % mobile)
在這個任務中,我們需要加載短信發送的sdk和相關的配置常量,所以我們可以直接把django中的短信發送模塊和相關的常量配置文件直接剪切到當前sms任務目錄中
mycelery/
├── config.py
├── __init__.py
├── main.py
└── sms/
├── constant.py
├── __init__.py
├── tasks.py
└── yuntongxun
├── CCPRestSDK.py
├── __init__.py
├── sms.py
└── xmltojson.py
再次啟動項目即可。
最終在django里面,我們調用Celery來異步執行任務。需要完成2個步驟:
# 1. 聲明一個和celery一模一樣的任務函數,但是我們可以導包來解決
from mycelery.sms.tasks import send_sms
# 2. 調用任務函數,發布任務
send_sms.delay(mobile,code)
# send_sms.delay() 如果調用的任務函數沒有參數,則不需要填寫任何內容