Celery淺談


一、Celery 核心模塊

1. Brokers

brokers 中文意思為中間人,在這里就是指任務隊列本身,接收生產者發來的消息即Task,將任務存入隊列。任務的消費者是Worker,Brokers 就是生產者和消費者存放/拿取產品的地方(隊列)。Celery 扮演生產者和消費者的角色。

常見的 brokers 有 rabbitmq、redis、Zookeeper 等。推薦用Redis或RabbitMQ實現隊列服務。

2. Workers

就是 Celery 中的工作者,執行任務的單元,類似與生產/消費模型中的消費者。它實時監控消息隊列,如果有任務就從隊列中取出任務並執行它。

3. Backend / Result Stores

用於存儲任務的執行結果。隊列中的任務運行完后的結果或者狀態需要被任務發送者知道,那么就需要一個地方儲存這些結果,就是 Result Stores 了。

常見的 backend 有 redis、Memcached 甚至常用的數據庫都可以。

4. Tasks

就是想在隊列中進行的任務,有異步任務和定時任務。一般由用戶、觸發器或其他操作將任務入隊,然后交由 workers 進行處理。

5. Beat

定時任務調度器,根據配置定時將任務發送給Brokers。

二、Celery 基本使用

1.創建一個celery application 用來定義你的任務列表,創建一個任務文件就叫tasks.py吧。

from celery import Celery
 
# 配置好celery的backend和broker
app = Celery('task1',  backend='redis://127.0.0.1:6379/0', broker='redis://127.0.0.1:6379/0')
  
#普通函數裝飾為 celery task
@app.task 
def add(x, y):
    return x + y

如此而來,我們只是定義好了任務函數func函數和worker(celery對象)。worker相當於工作者。

2.啟動Celery Worker來開始監聽並執行任務。broker 我們有了,backend 我們有了,task 我們也有了,現在就該運行 worker 進行工作了,在 tasks.py 所在目錄下運行:

[root@localhost ~]# celery -A tasks worker --loglevel=info    # 啟動方法1
[root@localhost ~]# celery -A tasks worker --l debug          # 啟動方法2

現在 tasks 這個任務集合的 worker 在進行工作(當然此時broker中還沒有任務,worker此時相當於待命狀態),如果隊列中已經有任務了,就會立即執行。

3.調用任務:要給Worker發送任務,需要調用 delay() 方法。

import time
from tasks import add
 
# 不要直接add(6, 6),這里需要用 celery 提供的接口 delay 進行調用
result = add.delay(6, 6)
while not result.ready():
    time.sleep(1)
print('task done: {0}'.format(result.get()))

三、Celery 進階使用

1.celery_config.py:配置文件

from __future__ import absolute_import, unicode_literals
#從python的絕對路徑導入而不是當前的腳本     #在python2和python3做兼容支持的
 
BROKER_URL = 'redis://127.0.0.1:6379/0'
# 指定結果的接受地址
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'

2.tasks.py

from __future__ import absolute_import, unicode_literals
#從python的絕對路徑導入而不是當前的腳本     #在python2和python3做兼容支持的
from celery import Celery
 
# 配置好celery的backend和broker, task1:app的名字。broker
app = Celery('task1',                              #
             broker='redis://127.0.0.1:6379/0',   # 消息隊列:連rabbitmq或redis
             backend='redis://127.0.0.1:6379/0')  # 存儲結果:redis或mongo或其他數據庫
  
app.config_from_object("celery_config")
app.conf.update(         # 給app設置參數
    result_expires=3600, # 保存時間為1小時
)
 
#普通函數裝飾為 celery task
@app.task 
def add(x, y):
    return x + y
     
if __name__ == '__main__':
    app.start()

3.啟動worker

[root@localhost ~]``# celery -A tasks worker --loglevel=info

4.test.py

import time
from tasks import add
 
# 不要直接add(4, 4),這里需要用 celery 提供的接口 delay 進行調用
result = add.delay(6, 6)
print(result.id)
while not result.ready():
    time.sleep(1)
print('task done: {0}'.format(result.get()))


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM