版本:
(celery==4.3,rabbitmq==3.7)
一.使用rabbitmq
# 安裝rabbitMQ(mac) brew install rabbitmq # 配置環境變量(.bash_profile或.profile,注意路徑) export RABBIT_HOME=/usr/local/Cellar/rabbitmq/3.7.15 export PATH=$PATH:$RABBIT_HOME/sbin # 設置RabbitMQ:創建一個用戶,一個虛擬主機並設置權限,(需要先啟動服務) sudo rabbitmqctl add_user myuser mypassword sudo rabbitmqctl add_vhost myvhost # 虛擬主機 sudo rabbitmqctl set_user_tags myuser mytag # administrator sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*" #啟動服務 sudo rabbitmq-server # 后台運行 sudo rabbitmq-server -detached # 不要kill(1)來停止服務器 sudo rabbitmqctl stop
二.celery使用
# 安裝
pip install celery
應用
# tasks.py from celery import Celery app = Celery( 'tasks', # 當前模塊的名字 broker='amqp://guest@localhost:port//' # 消息隊列的url ) @app.task def add(x, y): return x + y
運行worker
celery -A tasks worker --loglevel=info
調用任務
# delay from tasks import add add.delay(arg1,arg2,kwarg1='x',kwarg2='y') add.delay(*args, **kwargs).apply_async(args, kwargs) # apply_async task.apply_async(args=[arg1,arg2], kwargs={'kwargs':'x','kwargs':'y'}) tasks.apply_async((arg,), {'kwarg': value}) # 從現在起10秒內執行 tasks.apply_async(countdown=10) # 從現在起10秒內執行,使用指定eta tasks.apply_async(eta=now + timedelta(seconds=10)) # 從現在起一分鍾后執行,但在2分鍾后過期 tasks.apply_async(countdown=60, expires=120) # 在2天后到期,設置使用datetime對象 T.apply_async(expires=now + timedelta(days=2)) # send_task:任務未在當前進程中注冊 app.send_task('任務', args=[arg,], queue='default') # signature用於傳遞任務調用簽名的對象(例如通過網絡發送),並且它們也支持calling api task.s(arg1,arg2,kwarg1='x',kwargs2='y').apply_async()