Celery多隊列配置


Celery多隊列配置

 

Celery官方文檔

項目結構

/proj
-__init__
-app.py                        #實例化celery對象
-celeryconfig.py               #celery的配置文件
-tasks.py                      #celery編寫任務文件

app.py

#coding:utf-8
from __future__ import absolute_import
from celery import Celery

app = Celery('proj', include=['proj.tasks'])     #實例化celery對象

app.config_from_object('proj.celeryconfig')      #引入配置文件

if __name__ == '__main__':                       
    app.start()
  • proj參數為celery的名字
  • include參數為啟動時導入的模塊列表

 tasks.py

#coding:utf-8
from __future__ import absolute_import

from proj.app import app
@app.task()
def add(x, y):
    return x + y

celeryconfig.py

#coding:utf-8
from kombu import Queue

BROKER_URL = 'amqp://guest:guest@127.0.0.1:5672//' # 使用RabbitMQ作為消息代理


CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 把任務結果存在了Redis

CELERY_TASK_SERIALIZER = 'msgpack' # 任務序列化和反序列化使用msgpack方案

CELERY_RESULT_SERIALIZER = 'json' # 讀取任務結果一般性能要求不高,所以使用了可讀性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間,不建議直接寫86400,應該讓這樣的magic數字表述更明顯

CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的內容類型

CELERY_QUEUES = (    #設置add隊列,綁定routing_key
    Queue('add', routing_key='xue.add'),
)


CELERY_ROUTES = {   #projq.tasks.add這個任務進去add隊列並routeing_key為xue.add  
    'projq.tasks.add': { 
        'queue': 'add',
        'routing_key': 'xue.add',
    }
}
  • CELERY_ACCEPT_CONTENT的類型msgpack為是一種比json更小更快的類型,如果用需要安裝相對應的包。
  • CELERY_QUEUES設置一個指定routing_key的隊列,這個名字可以任意指定。
  • CELERY_ROUTES設置路由,對指定的任務名,指定對應的隊列和routing_key,注意,這里的routing_key需要和上面參數的一致。

 

啟動

在proj的上層目錄輸入

celery -A proj.app worker -Q add -l info
  • proj.tasks.add為任務名稱,也就是在CELERY_ROUTES設置的那個名稱
  • add是設置的queue,key=xue.add是設置的routing_key

 

發布任務

from proj.tasks import add

add.delay(2,3)

 

多隊列中需要修改的地方

CELERY_QUEUES = (    #設置add隊列,綁定routing_key
    Queue('add', routing_key='xue.add'),
)


CELERY_ROUTES = {   #projq.tasks.add這個任務進去add隊列並routeing_key為xue.add  
    'projq.tasks.add': { 
        'queue': 'add',
        'routing_key': 'xue.add',
    }

 

 配置兩個隊列

# 配置隊列
CELERY_QUEUES = (
    Queue('default', routing_key='default'),
    Queue('隊列1', routing_key='key1'),
    Queue('隊列2',  routing_key='key2'),
)
 
# 路由(哪個任務放入哪個隊列)
CELERY_ROUTES = {
    '任務1': {'queue': '隊列1', 'routing_key': 'key1'},
    '任務2': {'queue': '對列2', 'routing_key': 'key2'},
}
關於Celery的監控與管理向導





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM