celery簡單理解和使用


解決同步阻塞的問題

 

 

 

將耗時任務放到后台異步執行,不影響用戶其他操作。

實現原理

 

任務隊列是一種跨線程,跨機器的機制。

任務隊列中包含稱作任務的工作單元。有專門的進程持續不斷的監視任務隊列,並從中得到新的任務處理。

elery通過消息進行通信,通常使用一個叫Broker(中間人)來協client(任務的發出者)和worker(任務的處理者). clients發出消息到隊列中,broker將隊列中的信息派發給worker來處理。

  一個celery系統可以包含很多的worker和broker,可增強橫向擴展性和高可用性能。

 

 broker

RabbitMQ是一個功能完備,穩定的並且易於安裝的broker. 它是生產環境中最優的選擇。

Redis也是一款功能完備的broker可選項,但是其更可能因意外中斷或者電源故障導致數據丟失的情況。 關於是有那個Redis作為Broker,可訪下面網址:http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis

使用

1.創建應用

首先創建tasks.py模塊

 

from celery import Celery

# 我們這里案例使用redis作為broker
app = Celery('demo', broker='redis://:332572@127.0.0.1/1')

# 創建任務函數
@app.task
def my_task():
    print("任務函數正在執行....")

Celery第一個參數是給其設定一個名字, 第二參數我們設定一個中間人broker, 在這里我們使用Redis作為中間人。my_task函數是我們編寫的一個任務函數, 通過加上裝飾器app.task, 將其注冊到broker的隊列中。

  現在我們在創建一個worker, 等待處理隊列中的任務.打開終端,cd到tasks.py同級目錄中,執行命令:

celery -A tasks worker --loglevel=info

2.調用任務

任務加入到broker隊列中,以便剛才我們創建的celery workder服務器能夠從隊列中取出任務並執行。如何將任務函數加入到隊列中,可使用delay()。

進入python終端, 執行如下代碼:

from tasks import my_task
my_task.delay()

3.存儲結果

如果我們想跟蹤任務的狀態,Celery需要將結果保存到某個地方。有幾種保存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

  例子我們仍然使用Redis作為存儲結果的方案,任務結果存儲配置我們通過Celery的backend參數來設定。我們將tasks模塊修改如下:

from celery import Celery

# 我們這里案例使用redis作為broker
app = Celery('demo',
             backend='redis://:332572@127.0.0.1:6379/2',
             broker='redis://:332572@127.0.0.1:6379/1')

# 創建任務函數
@app.task
def my_task(a, b):
    print("任務函數正在執行....")
    return a + b

我們給Celery增加了backend參數,指定redis作為結果存儲,並將任務函數修改為兩個參數,並且有返回值。

 

 配置

1.直接通過app來配置

from celery import Celery
app = Celery('demo')
# 增加配置
app.conf.update(
    result_backend='redis://:332572@127.0.0.1:6379/2',
    broker_url='redis://:332572@127.0.0.1:6379/1',
)

2.專有配置文件

對於比較大的項目,我們建議配置信息作為一個單獨的模塊。我們可以通過調用app的函數來告訴Celery使用我們的配置模塊。

配置模塊的名字我們取名為celeryconfig, 這個名字不是固定的,我們可以任意取名,建議這么做。我們必須保證配置模塊能夠被導入。

下面我們在tasks.py模塊 同級目錄下創建配置模塊celeryconfig.py:

result_backend = 'redis://:332572@127.0.0.1:6379/2'
broker_url = 'redis://:332572@127.0.0.1:6379/1'

tasks.py文件修改為:

from celery import Celery
import celeryconfig

# 我們這里案例使用redis作為broker
app = Celery('demo')

# 從單獨的配置模塊中加載配置
app.config_from_object('celeryconfig')

django使用celery示例

1.創建celery_tasks包

創建main.py config.py 具體的任務包eg:sms

在sms包中創建tasks.py

 

tasks.py 代碼

from celery_tasks.main import app

@app.task(name='my_task1')
def my_task1(*args, **kwargs):
    print('執行任務1發送sms短信')

config.py代碼

broker_url = "redis://127.0.0.1/14"

main.py代碼

from celery import Celery

# 為celery使用django配置文件進行設置
import os
if not os.getenv('DJANGO_SETTINGS_MODULE'):
    os.environ['DJANGO_SETTINGS_MODULE'] = 'demo.settings.dev'

# 創建celery應用
app = Celery('my_app')

# 導入celery配置
app.config_from_object('celery_tasks.config')

# 自動注冊celery任務
app.autodiscover_tasks(['celery_tasks.sms'])

啟動celery

celery -A celry_tasks.main worker -l info

 

在需要調用任務的模塊使用

from celery_tasks.sms import tasks as sms_tasks


 sms_tasks.send_sms_code.delay(mobile, sms_code, sms_code_expires)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM