生產者:
- 文件1: 定義任務
#!/usr/bin/env python3
# coding: utf-8
from celery import Celery
import settings
pw = settings.SESSION_REDIS['password']
celery_broker = 'redis://:%s@localhost:6379/0' % pw
celery_backend = celery_broker
app = Celery('tasks', broker=celery_broker, backend=celery_backend)
@app.task
def analysis_main_12(current_id_str, q_num_str):
pass
@app.task
def analysis_main_3(current_id_str, q_num_str):
pass
- 文件2: 產生任務並放到隊列
from celery_tasks import analysis_main_12, analysis_main_3
def main():
......
q = get_q3_from_db()
ret = analysis_main_3.apply_async(args=(str(current_test.id), str(q_num)), queue='for_q_type3')
q = get_q12_from_db()
ret = analysis_main_12.apply_async(args=(str(current_test.id), str(q_num)), queue='for_q_type12') # ret是 "AsyncResult"對象, id 可由 ret.id取得
......
if __name__ == '__main__':
main()
注: 該文件中使用了mytaskfunction.apply_async(...)
而非mytaskfunction.delay(...)
:后者是前者的包裝(使用更方便),而直接使用前者則可使用更多參數,比如queue。
這里的queue正是要在消費者中配置使用的queue,注意名字要對應,不要寫錯。
## 消費者(worker) - 文件1:定義任務函數 除了配置celery app之外,主要工作是配置celery使用的隊列和routes: ``` import config from kombu import Queue, Exchange from celery import Celery
app = Celery('tasks', broker=config.Celery_broker, backend=config.Celery_backend)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s:\t%(message)s')
配置隊列
CELERY_QUEUES = (
Queue('for_q_type3', Exchange('for_q_type3'), routing_key='for_q_type3'), # consumer_arguments={'x-priority': 10}),
Queue('for_q_type12', Exchange('for_q_type12'), routing_key='for_q_type12'), # consumer_arguments={'x-priority': 1}),
Queue('default', Exchange('default'), routing_key='default'),
) # consumer_arguments={'x-priority': 5} 數字越大,優先級越高 - only for rabbitmq?
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_ROUTES = {
# -- HIGH PRIORITY QUEUE -- #
'app.tasks.analysis_main_3': {'queue': 'for_q_type3'},
# -- LOW PRIORITY QUEUE -- #
'app.tasks.analysis_main_12': {'queue': 'for_q_type12'},
'app.tasks.analysis_main': {'queue': 'default'},
}
@app.task
def analysis_main_12(current_id, q_num):
......
your code here
......
@app.task
def analysis_main_3(current_id, q_num):
......
your code here
......
- 啟動命令 或 docker的entrypoint.sh
例如,這是一個entrypoint.sh:
```bash
#!/bin/sh
echo executing entrypoint.sh ...
celery worker -A celery_tasks.app -n worker_Qtype12 -Q for_q_type12 --loglevel=info --concurrency=12 &
celery worker -A celery_tasks.app -n worker_Qtype3 -Q for_q_type3 --loglevel=info --concurrency=8 &
celery flower -A celery_tasks.app --address=0.0.0.0 --port=50080
注意
上述配置中需要注意生產者、消費者和啟動命令三者所用的queue是相對應的,不要寫錯。
上述配置只驗證了多個任務隊列,至於優先隊列功能是否有效未做驗證。 - priority queue 參考:
https://stackoverflow.com/questions/15809811/celery-tasks-that-need-to-run-in-priority