舉個小栗子,在生產環境下,我們有兩個異步任務需求,需要分別部署在兩台服務器上,並用不同的隊列實現
- 用戶郵件發送
- pageview統計
主要的注意點,在任務執行時需指定queue,routing_key
文件結構
celery_demo # 項目根目錄
├── celery_app # 存放 celery 相關文件
│ ├── __init__.py
│ ├── celeryconfig.py # 配置文件
│ ├── task1.py # 任務文件 1
│ └── task2.py # 任務文件 2
└── client.py # 應用程序
init.py
from celery import Celery
app = Celery('demo') # 創建 Celery 實例
app.config_from_object('celery_app.celeryconfig') # 通過 Celery 實例加載配置模塊
__all__ = ['app']
celeryconfig.py
from kombu import Queue
from kombu import Exchange
BROKER_URL = 'redis://192.168.31.45:6379/0' # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://192.168.31.45:6379/1' # 指定 Backend
CELERY_TIMEZONE='Asia/Shanghai' # 指定時區,默認是 UTC
# CELERY_TIMEZONE='UTC'
CELERY_IMPORTS = ( # 指定導入的任務模塊
'celery_app.task1',
'celery_app.task2'
)
task_queues = (
Queue('default', exchange=Exchange('default'), routing_key='default'),
Queue('email', exchange=Exchange('email'), routing_key='email'),
Queue('pageview', exchange=Exchange('pageview'), routing_key='pageview'),
)
task_routes = {
'celery_app.task1.add': {'queue': 'email', 'routing_key': 'email'},
'celery_app.task2.multiply': {'queue': 'pageview', 'routing_key': 'pageview'},
}
task1.py
import time
from celery_app import app
@app.task
def add(x, y):
time.sleep(2)
return x + y
task2.py
import time
from celery_app import app
@app.task
def multiply(x, y):
time.sleep(2)
return x * y
client.py
from celery_app import task1
from celery_app import task2
task1.add.apply_async(args=[2, 8],queue="email",routing_key="email")
task2.multiply.apply_async(args=[3, 7],queue="pageview",routing_key="pageview")
print('hello world')
啟動woker
server1:
$ celery worker -A celery_app -l info -Q email
server2:
$ celery worker -A celery_app -l info -Q pageview