解決同步阻塞的問題
將耗時任務放到后台異步執行,不影響用戶其他操作。
實現原理
任務隊列是一種跨線程,跨機器的機制。
任務隊列中包含稱作任務的工作單元。有專門的進程持續不斷的監視任務隊列,並從中得到新的任務處理。
elery通過消息進行通信,通常使用一個叫Broker(中間人)來協client(任務的發出者)和worker(任務的處理者). clients發出消息到隊列中,broker將隊列中的信息派發給worker來處理。
一個celery系統可以包含很多的worker和broker,可增強橫向擴展性和高可用性能。
broker
RabbitMQ是一個功能完備,穩定的並且易於安裝的broker. 它是生產環境中最優的選擇。
Redis也是一款功能完備的broker可選項,但是其更可能因意外中斷或者電源故障導致數據丟失的情況。 關於是有那個Redis作為Broker,可訪下面網址:http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis
使用
1.創建應用
首先創建tasks.py模塊
from celery import Celery # 我們這里案例使用redis作為broker app = Celery('demo', broker='redis://:332572@127.0.0.1/1') # 創建任務函數 @app.task def my_task(): print("任務函數正在執行....")
Celery第一個參數是給其設定一個名字, 第二參數我們設定一個中間人broker, 在這里我們使用Redis作為中間人。my_task函數是我們編寫的一個任務函數, 通過加上裝飾器app.task, 將其注冊到broker的隊列中。
現在我們在創建一個worker, 等待處理隊列中的任務.打開終端,cd到tasks.py同級目錄中,執行命令:
celery -A tasks worker --loglevel=info
2.調用任務
任務加入到broker隊列中,以便剛才我們創建的celery workder服務器能夠從隊列中取出任務並執行。如何將任務函數加入到隊列中,可使用delay()。
進入python終端, 執行如下代碼:
from tasks import my_task my_task.delay()
3.存儲結果
如果我們想跟蹤任務的狀態,Celery需要將結果保存到某個地方。有幾種保存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。
例子我們仍然使用Redis作為存儲結果的方案,任務結果存儲配置我們通過Celery的backend參數來設定。我們將tasks模塊修改如下:
from celery import Celery # 我們這里案例使用redis作為broker app = Celery('demo', backend='redis://:332572@127.0.0.1:6379/2', broker='redis://:332572@127.0.0.1:6379/1') # 創建任務函數 @app.task def my_task(a, b): print("任務函數正在執行....") return a + b
我們給Celery增加了backend參數,指定redis作為結果存儲,並將任務函數修改為兩個參數,並且有返回值。
配置
1.直接通過app來配置
from celery import Celery app = Celery('demo') # 增加配置 app.conf.update( result_backend='redis://:332572@127.0.0.1:6379/2', broker_url='redis://:332572@127.0.0.1:6379/1', )
2.專有配置文件
對於比較大的項目,我們建議配置信息作為一個單獨的模塊。我們可以通過調用app的函數來告訴Celery使用我們的配置模塊。
配置模塊的名字我們取名為celeryconfig, 這個名字不是固定的,我們可以任意取名,建議這么做。我們必須保證配置模塊能夠被導入。
下面我們在tasks.py模塊 同級目錄下創建配置模塊celeryconfig.py:
result_backend = 'redis://:332572@127.0.0.1:6379/2' broker_url = 'redis://:332572@127.0.0.1:6379/1'
tasks.py文件修改為:
from celery import Celery import celeryconfig # 我們這里案例使用redis作為broker app = Celery('demo') # 從單獨的配置模塊中加載配置 app.config_from_object('celeryconfig')
django使用celery示例
1.創建celery_tasks包
創建main.py config.py 具體的任務包eg:sms
在sms包中創建tasks.py
tasks.py 代碼
from celery_tasks.main import app @app.task(name='my_task1') def my_task1(*args, **kwargs): print('執行任務1發送sms短信')
config.py代碼
broker_url = "redis://127.0.0.1/14"
main.py代碼
from celery import Celery # 為celery使用django配置文件進行設置 import os if not os.getenv('DJANGO_SETTINGS_MODULE'): os.environ['DJANGO_SETTINGS_MODULE'] = 'demo.settings.dev' # 創建celery應用 app = Celery('my_app') # 導入celery配置 app.config_from_object('celery_tasks.config') # 自動注冊celery任務 app.autodiscover_tasks(['celery_tasks.sms'])
啟動celery
celery -A celry_tasks.main worker -l info
在需要調用任務的模塊使用
from celery_tasks.sms import tasks as sms_tasks sms_tasks.send_sms_code.delay(mobile, sms_code, sms_code_expires)